Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] AlltoAll OP, Update Data progress bars to use row as the iteration unit #46924

Merged
merged 15 commits into from
Aug 12, 2024

Conversation

Bye-legumes
Copy link
Contributor

Why are these changes needed?

close #46579

Related issue number

Checks

  • [√] I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • [√] I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • [√] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • [√] Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: zhilong <[email protected]>
@scottjlee scottjlee self-assigned this Aug 1, 2024
@Bye-legumes Bye-legumes changed the title [Data] AlltoAll OP, Update Data progress bars to use row as the iteration unit [WIP][Data] AlltoAll OP, Update Data progress bars to use row as the iteration unit Aug 2, 2024
Copy link
Contributor

@scottjlee scottjlee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also show an example screenshot of a progress bar output with a mix of OneToOne and AllToAll operators?

Comment on lines 84 to 86
if self._num_outputs:
return sum(bundle.num_rows() for bundle in self._output_buffer)
return self.input_dependencies[0].num_output_rows_total()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think here, we can simply return the input dependency's num_output_rows_total(). Because we don't know when/if self._output_buffer is completed, this may give a lower total row count as the AllToAllOperator continues to execute and more items are added into self._output_buffer.

Suggested change
if self._num_outputs:
return sum(bundle.num_rows() for bundle in self._output_buffer)
return self.input_dependencies[0].num_output_rows_total()
return self.input_dependencies[0].num_output_rows_total()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed!

@Bye-legumes
Copy link
Contributor Author

Bye-legumes commented Aug 6, 2024

Could you also show an example screenshot of a progress bar output with a mix of OneToOne and AllToAll operators?
This is a example of sort and how it look like.
image

@Bye-legumes Bye-legumes changed the title [WIP][Data] AlltoAll OP, Update Data progress bars to use row as the iteration unit [Data] AlltoAll OP, Update Data progress bars to use row as the iteration unit Aug 6, 2024
@scottjlee
Copy link
Contributor

Could you also show an example screenshot of a progress bar output with a mix of OneToOne and AllToAll operators?
This is a example of sort and how it look like.
image

For the Sort and Sort Sample progress bars, I would expect that these bars also report row/s throughput, but they seem to be unknown ?. is it possible to also correct the bar behavior for this case? thanks!

Signed-off-by: zhilong <[email protected]>
@Bye-legumes
Copy link
Contributor Author

Could you also show an example screenshot of a progress bar output with a mix of OneToOne and AllToAll operators?
This is a example of sort and how it look like.
image

For the Sort and Sort Sample progress bars, I would expect that these bars also report row/s throughput, but they seem to be unknown ?. is it possible to also correct the bar behavior for this case? thanks!

fixed now. I just need to change to def num_output_rows_total(self) -> Optional[int]: return ( self._output_rows if self._output_rows else self.input_dependencies[0].num_outputs_total() ) similar to the number of bundles.
image

@scottjlee
Copy link
Contributor

Also not sure if this is related to the fix I mentioned above, but the global progress bar also shows in bundles (197/197). Could you confirm whether this is still an issue after updating to use num_output_rows_total()?

@Bye-legumes
Copy link
Contributor Author

Also not sure if this is related to the fix I mentioned above, but the global progress bar also shows in bundles (197/197). Could you confirm whether this is still an issue after updating to use num_output_rows_total()?

Here is the current screenshot...Your are right.. This still cannot fix that.. Let me check if there are are place that I need to modify...
image
Here is the codes that I used for testing and let me check how the parallelism influenced the rows..

import ray
ray.init(address = "10.193.182.83:6274")
ctx = ray.data.context.DatasetContext.get_current()
use_push_based_shuffle = False
num_items = 30001
parallelism = 200
import pandas as pd
import numpy as np
import time


t1 = time.time()
original = ctx.use_push_based_shuffle
ctx.use_push_based_shuffle = use_push_based_shuffle

a = list(reversed(range(num_items)))

shard = int(np.ceil(num_items / parallelism))
b = [1]*1
offset = 0
dfs = []
while offset < num_items:
    dfs.append(
        pd.DataFrame(
            {"a": a[offset : offset + shard], "b": [b]*len(a[offset : offset + shard])}
        )
    )
    offset += shard
if offset < num_items:
    dfs.append(pd.DataFrame({"a": a[offset:], "b": b[offset:]}))
ds = ray.data.from_pandas(dfs)
sorted_ds = ds.sort(key="a")
res = [tuple(row.values()) for row in sorted_ds.iter_rows()]
print(f"time used : \n{time.time()-t1}")

@Bye-legumes
Copy link
Contributor Author

I think for the above, it's related to here

I many need to modify the task specification..

@scottjlee
Copy link
Contributor

I think for the above, it's related to here

I many need to modify the task specification..

i think that one is related to the bar with Sort Sample. For this one, since the task specification is a bit more involved, we can do this in a followup. But for the global bar (Dataset execution finished in ...), the output unit still shows as blocks or bundles, which I think we should change to rows.

Signed-off-by: zhilong <[email protected]>
Signed-off-by: zhilong <[email protected]>
@Bye-legumes
Copy link
Contributor Author

Bye-legumes commented Aug 7, 2024

I think for the above, it's related to here

I many need to modify the task specification..

i think that one is related to the bar with Sort Sample. For this one, since the task specification is a bit more involved, we can do this in a followup. But for the global bar (Dataset execution finished in ...), the output unit still shows as blocks or bundles, which I think we should change to rows.

oh. OK, I see! I am still try to change the sort sample. But if it can be a followup for all other shuffle ops, I think it's OK now and here is what looks like. The modification to fetch_until_complete will works for all shuffle ops I think
image

Signed-off-by: zhilong <[email protected]>
Comment on lines +164 to +169
num_rows = (
result.num_rows if hasattr(result, "num_rows") else 1
) # Default to 1 if no row count is available
total_rows_processed += num_rows
# TODO(zhilong): Change the total to total_row when init progress bar
self.update(total_rows_processed)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

Copy link
Contributor

@scottjlee scottjlee left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the improvement @Bye-legumes !

Copy link
Contributor

@omatthew98 omatthew98 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some questions but lgtm otherwise!

return (
self._output_rows
if self._output_rows
else self.input_dependencies[0].num_output_rows_total()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is self.input_dependencies[0].num_output_rows_total() something that is static? Should we cache this value with some call like self._output_rows = self.input_dependencies[0].num_output_rows_total()?

If this total is a live total that is updated as execution continues makes sense to leave as is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right! Here the self._output_rows is not static, but it's our primary option, as it will be update here
image

Signed-off-by: zhilong <[email protected]>
@scottjlee scottjlee added the go add ONLY when ready to merge, run all tests label Aug 10, 2024
@anyscalesam anyscalesam added P1 Issue that should be fixed within a few weeks data Ray Data-related issues labels Aug 12, 2024
@scottjlee scottjlee merged commit 872ce54 into ray-project:master Aug 12, 2024
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
data Ray Data-related issues go add ONLY when ready to merge, run all tests P1 Issue that should be fixed within a few weeks
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Data] Update Data progress bars to use row as the iteration unit
4 participants