Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Datasets] Support different number of blocks/rows per block in zip(). #32795

Merged
merged 3 commits into from
Feb 24, 2023

Conversation

clarkzinzow
Copy link
Contributor

This PR adds support for a different number of blocks/rows per block in ds1.zip(ds2), by aligning the blocks in ds2 to ds1 with a lightweight repartition/block splitting.

Design

We heavily utilize the block splitting machinery that's use for ds.split() and ds.split_at_indices() to avoid an overly expensive repartition. Namely, for ds1.zip(ds2), we:

  1. Calculate the block sizes for ds1 in order to get split indices.
  2. Apply _split_at_indices() to ds2 in order to get a list of ds2 block chunks for every block in ds1, such that self_block.num_rows() == sum(other_block.num_rows() for other_block in other_split_blocks) for every self_block in ds1.
  3. Zip together each block in ds1 with the one or more blocks from ds2 that constitute the block-aligned split for that ds1 block.

Related issue number

Closes #32375

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Copy link
Contributor

@ericl ericl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM at high level

@clarkzinzow
Copy link
Contributor Author

Tests look ok, @c21 @jianoaix could one of y'all take a look?

)
return blocks, {}

super().__init__("zip", None, do_zip_all)


def _do_zip(block: Block, *other_blocks: Block) -> (Block, BlockMetadata):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Annotate other_blocks as List[Block] for readability?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variadic args should have the type annotation of a single one of the arguments, not the collection. https://peps.python.org/pep-0484/#arbitrary-argument-lists-and-default-argument-values

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's List[ObjectRef[Block] from splitting output, any reason this to be a variadic arg?

Copy link
Contributor Author

@clarkzinzow clarkzinzow Feb 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used as a Ray task function, and Ray only resolves object refs that are top-level arguments, so we want to have each of these data blocks as top-level arguments to (1) get automatic materialization, (2) ensure the task isn't scheduled until all blocks are resolved, and (3) take advantage of locality-aware scheduling of the task; we wouldn't get any of those 3 things if we did a ray.get() in the task function, if that's what you're recommending.

We use this same pattern elsewhere, whenever we send a variable number of data blocks to a Ray task, we destructure it into a variadic arg so all of the above happens.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, makes sense!

python/ray/data/_internal/stage_impl.py Show resolved Hide resolved
@c21
Copy link
Contributor

c21 commented Feb 24, 2023

btw it would be great if we can create a benchmark for zip() from we learned.

@ericl ericl added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Feb 24, 2023
python/ray/data/dataset.py Outdated Show resolved Hide resolved
)
return blocks, {}

super().__init__("zip", None, do_zip_all)


def _do_zip(block: Block, *other_blocks: Block) -> (Block, BlockMetadata):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's List[ObjectRef[Block] from splitting output, any reason this to be a variadic arg?

@ericl ericl merged commit e3f875c into ray-project:master Feb 24, 2023
clarkzinzow added a commit to clarkzinzow/ray that referenced this pull request Mar 3, 2023
ray-project#32795)

This PR adds support for a different number of blocks/rows per block in `ds1.zip(ds2)`, by aligning the blocks in `ds2` to `ds1` with a lightweight repartition/block splitting.

## Design

We heavily utilize the block splitting machinery that's use for `ds.split()` and `ds.split_at_indices()` to avoid an overly expensive repartition. Namely, for `ds1.zip(ds2)`, we:
1. Calculate the block sizes for `ds1` in order to get split indices.
2. Apply `_split_at_indices()` to `ds2` in order to get a list of `ds2` block chunks for every block in `ds1`, such that `self_block.num_rows() == sum(other_block.num_rows() for other_block in other_split_blocks)` for every `self_block` in `ds1`.
3. Zip together each block in `ds1` with the one or more blocks from `ds2` that constitute the block-aligned split for that `ds1` block.
zcin pushed a commit that referenced this pull request Mar 3, 2023
#32795) (#32998)

This PR adds support for a different number of blocks/rows per block in `ds1.zip(ds2)`, by aligning the blocks in `ds2` to `ds1` with a lightweight repartition/block splitting.

## Design

We heavily utilize the block splitting machinery that's use for `ds.split()` and `ds.split_at_indices()` to avoid an overly expensive repartition. Namely, for `ds1.zip(ds2)`, we:
1. Calculate the block sizes for `ds1` in order to get split indices.
2. Apply `_split_at_indices()` to `ds2` in order to get a list of `ds2` block chunks for every block in `ds1`, such that `self_block.num_rows() == sum(other_block.num_rows() for other_block in other_split_blocks)` for every `self_block` in `ds1`.
3. Zip together each block in `ds1` with the one or more blocks from `ds2` that constitute the block-aligned split for that `ds1` block.
amogkam added a commit that referenced this pull request Mar 7, 2023
edoakes pushed a commit to edoakes/ray that referenced this pull request Mar 22, 2023
ray-project#32795)

This PR adds support for a different number of blocks/rows per block in `ds1.zip(ds2)`, by aligning the blocks in `ds2` to `ds1` with a lightweight repartition/block splitting.

## Design

We heavily utilize the block splitting machinery that's use for `ds.split()` and `ds.split_at_indices()` to avoid an overly expensive repartition. Namely, for `ds1.zip(ds2)`, we:
1. Calculate the block sizes for `ds1` in order to get split indices.
2. Apply `_split_at_indices()` to `ds2` in order to get a list of `ds2` block chunks for every block in `ds1`, such that `self_block.num_rows() == sum(other_block.num_rows() for other_block in other_split_blocks)` for every `self_block` in `ds1`.
3. Zip together each block in `ds1` with the one or more blocks from `ds2` that constitute the block-aligned split for that `ds1` block.

Signed-off-by: Edward Oakes <[email protected]>
peytondmurray pushed a commit to peytondmurray/ray that referenced this pull request Mar 22, 2023
ray-project#32795)

This PR adds support for a different number of blocks/rows per block in `ds1.zip(ds2)`, by aligning the blocks in `ds2` to `ds1` with a lightweight repartition/block splitting.

## Design

We heavily utilize the block splitting machinery that's use for `ds.split()` and `ds.split_at_indices()` to avoid an overly expensive repartition. Namely, for `ds1.zip(ds2)`, we:
1. Calculate the block sizes for `ds1` in order to get split indices.
2. Apply `_split_at_indices()` to `ds2` in order to get a list of `ds2` block chunks for every block in `ds1`, such that `self_block.num_rows() == sum(other_block.num_rows() for other_block in other_split_blocks)` for every `self_block` in `ds1`.
3. Zip together each block in `ds1` with the one or more blocks from `ds2` that constitute the block-aligned split for that `ds1` block.
elliottower pushed a commit to elliottower/ray that referenced this pull request Apr 22, 2023
ray-project#32795)

This PR adds support for a different number of blocks/rows per block in `ds1.zip(ds2)`, by aligning the blocks in `ds2` to `ds1` with a lightweight repartition/block splitting.

## Design

We heavily utilize the block splitting machinery that's use for `ds.split()` and `ds.split_at_indices()` to avoid an overly expensive repartition. Namely, for `ds1.zip(ds2)`, we:
1. Calculate the block sizes for `ds1` in order to get split indices.
2. Apply `_split_at_indices()` to `ds2` in order to get a list of `ds2` block chunks for every block in `ds1`, such that `self_block.num_rows() == sum(other_block.num_rows() for other_block in other_split_blocks)` for every `self_block` in `ds1`.
3. Zip together each block in `ds1` with the one or more blocks from `ds2` that constitute the block-aligned split for that `ds1` block.

Signed-off-by: elliottower <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
@author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Datasets] Improve user experience of zip()
4 participants