Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Datasets] Change sampling to use same API as read Parquet #28258

Merged
merged 5 commits into from
Sep 8, 2022

Conversation

c21
Copy link
Contributor

@c21 c21 commented Sep 2, 2022

Signed-off-by: Cheng Su [email protected]

Why are these changes needed?

Found sampling OOM issue in #28230, after debugging I found the issue is due to batch_size passed when reading Parquet. Previously we set batsh_size=5, but it is causing too much overhead when reading files in #28230 (where on-disk file size is 2GB). So here I change the code to set batch_size as a larger number - 1024. In the mean time, restricting the number of rows to sample no more than the first row group, as suggested in https://lists.apache.org/thread/dq6g7yyt6jl8r6pcpgokl13cfyg6vdml .

Tested on the nightly test (with 400GB files in total), and the nightly test finished successfully before the timeout. Sample 2 files, each file is 2GB on disk, roughly takes 14 seconds now.

This time looks within resonable to me, so I think it's better to have same behavior between sampling and reading, to avoid any future surprise, even though one batch is large now.

Parquet Files Sample: 100%|██████████| 2/2 [00:14<00:00,  7.23s/it]

Related issue number

#28230

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Copy link
Contributor

@clarkzinzow clarkzinzow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit worried that we're going to be reading 20,000x more data with this new sampling.

Do we know why the small batch size was causing a lot of extra overhead in Arrow? Is it a bug in the head() implementation?

python/ray/data/datasource/parquet_datasource.py Outdated Show resolved Hide resolved
batches = piece.to_batches(
columns=columns,
schema=schema,
batch_size=PARQUET_READER_ROW_BATCH_SIZE,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit concerned that we're going to be reading a lot more data now, going from 5 rows to 100k rows, which could be a lot slower/heavier for very wide tables.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I shared the same concern. Read 2GB takes roughly 14 seconds, is the baseline we have now, which is not too bad. Let me explore more if I can find anything better.

@c21
Copy link
Contributor Author

c21 commented Sep 2, 2022

Do we know why the small batch size was causing a lot of extra overhead in Arrow? Is it a bug in the head() implementation?

unfortunately we don't know yet and I am gonna ask in Arrow mail list. It's not a bug in head() implementation, I tried with to_batches()

batches = piece.to_batches(
  columns=columns,
  schema=schema,
  batch_size=5,
  **reader_args,
)

and it's also running slow and OOM. So I guess there might be some exponential overhead associated with batch_size when its value is small.

@c21
Copy link
Contributor Author

c21 commented Sep 6, 2022

@clarkzinzow - based on discussion in https://lists.apache.org/thread/dq6g7yyt6jl8r6pcpgokl13cfyg6vdml, the Arrow Parquet reader has readahead feature beyond batch size, and extreme small batch size will incur a lot of overhead in readahead as we saw here. So I think best option for us now:

  • For now, sample the first row group as the Arrow folk suggested (and restrict the batch_size to be 100000 same as read, avoid any inconsistent surprise when row group is large).
  • After Arrow 10.0.0 release, explore the new readahead option (right now it's not exposed).

WDYT?

@clarkzinzow
Copy link
Contributor

Batch readahead should be disabled if use_threads is set to False, as we already do on the actual file reading, so we should be able to make the readahead a non-factor.

It looks like we weren't doing this for the file sampling, resulting in concurrent readaheads. If you have a quick benchmarking script handy, could you try it out with use_threads=False and a small batch size?

@c21
Copy link
Contributor Author

c21 commented Sep 7, 2022

Discussed with @clarkzinzow offline:

  • I tried setting batch_size=5 and use_threads=False, the file read becomes pretty slow and it takes >5 minutes for single file, and not working. So we guess there probably still some other overhead going on even when we disabling use_threads.
  • I also tried with batch_size=1024, it works well with similar time as batch_size=100000. With batch_size=1024, it's much more acceptable compared to batch_size=100000. Spark and Arrow Rust is also using 1024 rows as batch size when reading Parquet file.

So here we change the batch_size to be 1024. Also nightly test worked successfully - https://console.anyscale.com/o/anyscale-internal/projects/prj_2xR6uT6t7jJuu1aCwWMsle/clusters/ses_DQgxh91xNpBJQGbH2zcnTXpW?command-history-section=command_history&drivers-section=deployments. .

@c21 c21 changed the title [Datasets] Change sampling to use batch_size as read Parquet [Datasets] Change sampling to use same API as read Parquet Sep 7, 2022
Copy link
Contributor

@clarkzinzow clarkzinzow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for trying this out!

@ericl ericl merged commit c2be475 into ray-project:master Sep 8, 2022
@c21 c21 deleted the sample-fix branch September 8, 2022 20:32
ilee300a pushed a commit to ilee300a/ray that referenced this pull request Sep 12, 2022
…ct#28258)

Found sampling OOM issue in ray-project#28230, after debugging I found the issue is due to `batch_size` passed when reading Parquet. Previously we set `batsh_size=5`, but it is causing too much overhead when reading files in ray-project#28230 (where on-disk file size is 2GB). So here I change the code to set `batch_size` as a larger number - 1024. In the mean time, restricting the number of rows to sample no more than the first row group, as suggested in https://lists.apache.org/thread/dq6g7yyt6jl8r6pcpgokl13cfyg6vdml .

Tested on the nightly test (with 400GB files in total), and [the nightly test finished successfully before the timeout](https://console.anyscale.com/o/anyscale-internal/projects/prj_2xR6uT6t7jJuu1aCwWMsle/clusters/ses_DQgxh91xNpBJQGbH2zcnTXpW?command-history-section=command_history&drivers-section=deployments.). Sample 2 files, each file is 2GB on disk, roughly takes 14 seconds now.

This time looks within resonable to me, so I think it's better to have same behavior between sampling and reading, to avoid any future surprise, even though one batch is large now.

```
Parquet Files Sample: 100%|██████████| 2/2 [00:14<00:00,  7.23s/it]
```

Signed-off-by: ilee300a <[email protected]>
justinvyu pushed a commit to justinvyu/ray that referenced this pull request Sep 14, 2022
…ct#28258)

Found sampling OOM issue in ray-project#28230, after debugging I found the issue is due to `batch_size` passed when reading Parquet. Previously we set `batsh_size=5`, but it is causing too much overhead when reading files in ray-project#28230 (where on-disk file size is 2GB). So here I change the code to set `batch_size` as a larger number - 1024. In the mean time, restricting the number of rows to sample no more than the first row group, as suggested in https://lists.apache.org/thread/dq6g7yyt6jl8r6pcpgokl13cfyg6vdml .

Tested on the nightly test (with 400GB files in total), and [the nightly test finished successfully before the timeout](https://console.anyscale.com/o/anyscale-internal/projects/prj_2xR6uT6t7jJuu1aCwWMsle/clusters/ses_DQgxh91xNpBJQGbH2zcnTXpW?command-history-section=command_history&drivers-section=deployments.). Sample 2 files, each file is 2GB on disk, roughly takes 14 seconds now.

This time looks within resonable to me, so I think it's better to have same behavior between sampling and reading, to avoid any future surprise, even though one batch is large now.

```
Parquet Files Sample: 100%|██████████| 2/2 [00:14<00:00,  7.23s/it]
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants