Skip to content

Commit

Permalink
[Datasets] Change sampling to use same API as read Parquet (#28258)
Browse files Browse the repository at this point in the history
Found sampling OOM issue in #28230, after debugging I found the issue is due to `batch_size` passed when reading Parquet. Previously we set `batsh_size=5`, but it is causing too much overhead when reading files in #28230 (where on-disk file size is 2GB). So here I change the code to set `batch_size` as a larger number - 1024. In the mean time, restricting the number of rows to sample no more than the first row group, as suggested in https://lists.apache.org/thread/dq6g7yyt6jl8r6pcpgokl13cfyg6vdml .

Tested on the nightly test (with 400GB files in total), and [the nightly test finished successfully before the timeout](https://console.anyscale.com/o/anyscale-internal/projects/prj_2xR6uT6t7jJuu1aCwWMsle/clusters/ses_DQgxh91xNpBJQGbH2zcnTXpW?command-history-section=command_history&drivers-section=deployments.). Sample 2 files, each file is 2GB on disk, roughly takes 14 seconds now.

This time looks within resonable to me, so I think it's better to have same behavior between sampling and reading, to avoid any future surprise, even though one batch is large now.

```
Parquet Files Sample: 100%|██████████| 2/2 [00:14<00:00,  7.23s/it]
```
  • Loading branch information
c21 authored Sep 8, 2022
1 parent b1db7aa commit c2be475
Showing 1 changed file with 37 additions and 26 deletions.
63 changes: 37 additions & 26 deletions python/ray/data/datasource/parquet_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@

# The number of rows to read from each file for sampling. Try to keep it low to avoid
# reading too much data into memory.
PARQUET_ENCODING_RATIO_ESTIMATE_NUM_ROWS = 5
PARQUET_ENCODING_RATIO_ESTIMATE_NUM_ROWS = 1024


# TODO(ekl) this is a workaround for a pyarrow serialization bug, where serializing a
Expand Down Expand Up @@ -309,13 +309,17 @@ def _estimate_files_encoding_ratio(self) -> float:

sample_piece = cached_remote_fn(_sample_piece)
futures = []
for idx, sample in enumerate(file_samples):
# Sample i-th row group in i-th file.
for sample in file_samples:
# Sample the first rows batch in i-th file.
# Use SPREAD scheduling strategy to avoid packing many sampling tasks on
# same machine to cause OOM issue, as sampling can be memory-intensive.
serialized_sample = _SerializedPiece(sample)
futures.append(
sample_piece.options(scheduling_strategy="SPREAD").remote(
_SerializedPiece(sample), idx
self._reader_args,
self._columns,
self._schema,
serialized_sample,
)
)
sample_bar = ProgressBar("Parquet Files Sample", len(futures))
Expand Down Expand Up @@ -419,31 +423,38 @@ def _fetch_metadata(


def _sample_piece(
reader_args,
columns,
schema,
file_piece: _SerializedPiece,
row_group_id: int,
) -> float:
# Sample the `row_group_id`-th row group from file piece `serialized_piece`.
# Sample the first rows batch from file piece `serialized_piece`.
# Return the encoding ratio calculated from the sampled rows.
piece = _deserialize_pieces_with_retry([file_piece])[0]

# If required row group index is out of boundary, sample the last row group.
row_group_id = min(piece.num_row_groups - 1, row_group_id)
assert (
row_group_id >= 0 and row_group_id <= piece.num_row_groups - 1
), f"Required row group id {row_group_id} is not in expected bound"

row_group = piece.subset(row_group_ids=[row_group_id])
metadata = row_group.metadata.row_group(0)
num_rows = min(PARQUET_ENCODING_RATIO_ESTIMATE_NUM_ROWS, metadata.num_rows)
assert num_rows > 0 and metadata.num_rows > 0, (
f"Sampled number of rows: {num_rows} and total number of rows: "
f"{metadata.num_rows} should be positive"
# Only sample the first row group.
piece = piece.subset(row_group_ids=[0])
batch_size = min(piece.metadata.num_rows, PARQUET_ENCODING_RATIO_ESTIMATE_NUM_ROWS)
batches = piece.to_batches(
columns=columns,
schema=schema,
batch_size=batch_size,
**reader_args,
)

parquet_size = metadata.total_byte_size / metadata.num_rows
# Set batch_size to num_rows will instruct Arrow Parquet reader to read exactly
# num_rows into memory, o.w. it will read more rows by default in batch manner.
in_memory_size = row_group.head(num_rows, batch_size=num_rows).nbytes / num_rows
ratio = in_memory_size / parquet_size
logger.debug(f"Estimated Parquet encoding ratio is {ratio} for piece {piece}.")
return in_memory_size / parquet_size
# Use first batch in-memory size as ratio estimation.
try:
batch = next(batches)
in_memory_size = batch.nbytes / batch.num_rows
metadata = piece.metadata
total_size = 0
for idx in range(metadata.num_row_groups):
total_size += metadata.row_group(idx).total_byte_size
file_size = total_size / metadata.num_rows
ratio = in_memory_size / file_size
except StopIteration:
ratio = PARQUET_ENCODING_RATIO_ESTIMATE_LOWER_BOUND
logger.debug(
f"Estimated Parquet encoding ratio is {ratio} for piece {piece} "
f"with batch size {batch_size}."
)
return ratio

0 comments on commit c2be475

Please sign in to comment.