Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python][Dataset][Parquet] Enable Pre-Buffering by default for Parquet s3 datasets #36765

Closed
Akshay-A-Kulkarni opened this issue Jul 19, 2023 · 16 comments · Fixed by #37854
Closed

Comments

@Akshay-A-Kulkarni
Copy link

Akshay-A-Kulkarni commented Jul 19, 2023

Describe the enhancement requested

Summary

I'm seeing a discrepancy in read times between parquet and dataset modules for reading (more notice-able on fairly large) partitioned parquet tables.

In a nutshell, pyarrow.dataset.to_table() seems to be considerably slower than pyarrow.parquet.read_table() in most cases I have tested so far. The difference in read times for some synthetic data (see Appendix) I tested on was approximately 12x.

Details

import sys
sys.version

>>> '3.8.17 (default, Jul 5 2023, 21:04:15) \n[GCC 11.2.0]'

import pyarrow as pa
pa.__version__

>>> '12.0.1'

The test was done on a machine similiar in specs to an m5.4xlarge (nCPU=16 | Memory=64GiB) EC2 Instance

path = 's3://BUCKET/PREFIX/PARTITIONED_DATA_BASE_DIR'

import pyarrow.parquet as pq
table_pq = pq.read_table(path, partitioning='hive')
print(table_pq.schema)
print(table_pq.shape)

a: string
b: int64
c: double
part_col: dictionary<values=string, indices=int32, ordered=0>
(100000000, 4)

CPU times: user 21.2 s, sys: 5.4 s, total: 26.6 s
Wall time: 10.6 s

path = 's3://BUCKET/PREFIX/PARTITIONED_DATA_BASE_DIR'

import pyarrow.dataset as ds

table_ds = ds.dataset(
    path, 
    partitioning='hive', 
    format='parquet'
).to_table()

print(table_ds.schema)
print(table_ds.shape)

a: string
b: int64
c: double
part_col: dictionary<values=string, indices=int32, ordered=0>
(100000000, 4)

CPU times: user 49.7 s, sys: 6.67 s, total: 56.4 s
Wall time: 1min 57s

From doing some digging, I found that the root cause of the issue seems to be pre_buffer, which is not enabled for pyarrow.dataset by default.

Once I add the pre_buffering via ParquetFileFormat & ParquetFragmentScanOptions the issue gets resolved.

%%time
table_ds_buffer = ds.dataset(
    PATH_TO_S3_PARQUET, 
    partitioning='hive', 
    format=ds.ParquetFileFormat(
        default_fragment_scan_options=ds.ParquetFragmentScanOptions(
            pre_buffer=True
        )
    )
).to_table()



print(table_ds_buffer.schema)
print(table_ds_buffer.shape)

a: string
b: int64
c: double
part_col: string
(100000000, 4)

CPU times: user 21 s, sys: 5.27 s, total: 26.2 s
Wall time: 9.4 s

The issue seems to be exacerbated if the parquet datasets with partitions (maybe more file parts in general ?). Doing the read test on a single parquet was still worse but not as worse as the partition one.

Do we know the reasoning for this? As far as I understand, for S3-based parquet datasets, this could be turned on by default. Otherwise maybe we can add it as an argument for the user to enable in the dataset() call?

Appendix

Code to generate test data

%%time
from string import ascii_letters
import pyarrow.dataset as ds
import pyarrow as pa

num_rows = 100_000_000
num_files = 1 # 32/64
num_partitions = 4

import pandas as pd
table = pa.Table.from_pydict({
    'a': [str(i) for i in range(num_rows)],
    'b': range(num_rows),
    'c': [float(i) for i in range(num_rows)],
    'part_col':np.random.choice(list(ascii_letters[:num_partitions]), size=num_rows,
                                # p=[0.8,0.05,0.1,0.05] ## for creating row num skew between partitions
                               )
})


ds.write_dataset(
    table,
    path,
    format='parquet',
    partitioning_flavor='hive',
    partitioning=['part_col'],
    max_rows_per_file = (num_rows//(num_files*num_partitions)), 
    existing_data_behavior='overwrite_or_ignore'
)

Component(s)

Parquet, Python

@mapleFU
Copy link
Member

mapleFU commented Jul 19, 2023

Nice catch! I guess to_table should prebuffer the data, let me take a look

@mapleFU
Copy link
Member

mapleFU commented Jul 19, 2023

to_table:

        return self.scanner(
            schema=schema,
            columns=columns,
            filter=filter,
            batch_size=batch_size,
            batch_readahead=batch_readahead,
            fragment_readahead=fragment_readahead,
            fragment_scan_options=fragment_scan_options,
            use_threads=use_threads,
            memory_pool=memory_pool
        ).to_table()

It call DatasetScanner::ToTable. By default, it has a prefetch depth(fragment_readahead and batch_readahead), but I think that is not fully used in scanner. And fragment_scan_options would be used in C++ reader's code

@westonpace would you mind take a look or give some advices?

@westonpace
Copy link
Member

fragment_readahead and batch_readahead control how many files/row-groups to read at a time. pre_buffer controls how an individual row group is read. So these are separate properties. pre_buffer is probably always a good thing when reading from S3. However, when reading from local disk I think pre_buffer can sometimes lead to greater memory consumption. Is pre_buffer=True the default for read_table?

@mapleFU
Copy link
Member

mapleFU commented Jul 19, 2023

I guess

table_ds = ds.dataset(
    path, 
    partitioning='hive', 
    format='parquet'
).to_table()

print(table_ds.schema)
print(table_ds.shape)

Will not buffer when read from single Parquet dataset, so might cause performance lost.

def read_table(source, *, columns=None, use_threads=True, metadata=None,
               schema=None, use_pandas_metadata=False, read_dictionary=None,
               memory_map=False, buffer_size=0, partitioning="hive",
               filesystem=None, filters=None, use_legacy_dataset=False,
               ignore_prefixes=None, pre_buffer=True,
               coerce_int96_timestamp_unit=None,
               decryption_properties=None, thrift_string_size_limit=None,
               thrift_container_size_limit=None):

And this will default call pre_buffer

@mapleFU
Copy link
Member

mapleFU commented Jul 19, 2023

You can require pre_buffer first, or use larger buffer_size to do buffer read in Parquet.

  • pre_buffer: might issue io request before do cpu read
  • buffer_size: Apart from Local SSD, default S3 may prefer about 1MB read. I guess it could be better.

@westonpace
Copy link
Member

I did some recent testing a few weeks ago and I also found that buffer_size was better when it came to memory usage. pre_buffer will fetch the entire column for the entire row group (or row groups) that you are reading.

However, if pre_buffer is the default for read_table then I think it is ok to make it the default for datasets.

@mapleFU
Copy link
Member

mapleFU commented Jul 19, 2023

@westonpace Since Acero Scan node would default prefetch some fragments, would it cause memory usage worse than previous read_table?

@westonpace
Copy link
Member

Possibly. I think there are two concerns users generally have. Either they want "max speed" (use as little memory as possible but, if there is a speed/memory tradeoff, prefer speed) or they want "least memory" (if there is a speed/memory tradeoff, prefer memory)

I tried to simplify things in #35889 (this is why I was running experiments a few weeks ago) and came up with:

parquet::ReaderProperties

      case ParquetScanStrategy::kLeastMemory:
        properties.enable_buffered_stream();
        properties.set_buffer_size(8 * 1024 * 1024);
      case ParquetScanStrategy::kMaxSpeed:
        properties.disable_buffered_stream();

parquet::ArrowReaderProperties

      case ParquetScanStrategy::kLeastMemory:
        properties.set_batch_size(acero::ExecPlan::kMaxBatchSize);
        properties.set_pre_buffer(false);
      case ParquetScanStrategy::kMaxSpeed:
        properties.set_batch_size(64 * 1024 * 1024);
        properties.set_pre_buffer(true);
        properties.set_cache_options(io::CacheOptions::LazyDefaults());

I'm pretty sure that the kLeastMemory options use very low memory (even when scanning large files). I wasn't convinced that kMaxSpeed was much faster (but I only tested local disks and not S3).

I am very surprised to see a 10x difference due to pre-buffering. I don't think any of our experiments (S3 or not) ever showed a difference that was that drastic.

@westonpace
Copy link
Member

Since Acero Scan node would default prefetch some fragments, would it cause memory usage worse than previous read_table?

Ah, I misunderstood this question. Yes, scan node / datasets will always use more memory than read_table because it is reading multiple files at once. Hopefully we can still do some buffering though.

@Akshay-A-Kulkarni
Copy link
Author

fragment_readahead and batch_readahead control how many files/row-groups to read at a time. pre_buffer controls how an individual row group is read. So these are separate properties. pre_buffer is probably always a good thing when reading from S3. However, when reading from local disk I think pre_buffer can sometimes lead to greater memory consumption. Is pre_buffer=True the default for read_table?

@westonpace With the limited understanding that I have, if memory consumption on local fs is an issue, could we check the filesystem on dataset()call and if its S3 enable pre_buffering for parquet datasets then?

@Akshay-A-Kulkarni
Copy link
Author

.....
I'm pretty sure that the kLeastMemory options use very low memory (even when scanning large files). I wasn't convinced that kMaxSpeed was much faster (but I only tested local disks and not S3).

I am very surprised to see a 10x difference due to pre-buffering. I don't think any of our experiments (S3 or not) ever showed a difference that was that drastic.

@westonpace I was surprised as well. but I've received similar speedup results from a colleague facing the same issue. On that note, are you able to replicate the issue with the snippet ?

@westonpace
Copy link
Member

@westonpace I was surprised as well. but I've received similar speedup results from a colleague facing the same issue. On that note, are you able to replicate the issue with the snippet ?

Yes, I am.

I agree that this is pretty important we make this the default, at least for S3, if not the general default (which I think would probably be ok).

@ldacey
Copy link

ldacey commented Sep 25, 2023

I was not aware of this toggle before, but I had massive performance improvements when I turned on pre_buffer From 249 seconds to 2.9 seconds for 1.8 GB of data (2mil+ rows) querying for one month of data. Data is partitioned monthly and goes back to 2018.

This was with adlfs filesystem querying for files on Azure Blob.

@jorisvandenbossche
Copy link
Member

I was chatting about this issue with some people at PyData Amsterdam, and was planning to make a PR to just switch the default when back, so here it is: #37854

That's only changing the default for Python (pyarrow.dataset), but should we also change the default in C++?
From a basic check, it seems the R code already sets it by default (this was changed a while ago in #11386).

I noticed that the R PR was also setting the cache_options to LazyDefaults. That's then also something we want to change in the Python/C++ side? (current default is CacheOptions::Defaults())

Another useful reference for the above discussion is #28218 (https://issues.apache.org/jira/browse/ARROW-12428), where @lidavidm did some benchmarks with pre_buffer enabled/disabled, and which was the reason for exposing the pre_buffer option in pyarrow.parquet with a default of True (#10074)

@mapleFU
Copy link
Member

mapleFU commented Sep 26, 2023

Status FileReaderImpl::GetRecordBatchReader(const std::vector<int>& row_groups,
                                            const std::vector<int>& column_indices,
                                            std::unique_ptr<RecordBatchReader>* out) {
  RETURN_NOT_OK(BoundsCheck(row_groups, column_indices));

  if (reader_properties_.pre_buffer()) {
    // PARQUET-1698/PARQUET-1820: pre-buffer row groups/column chunks if enabled
    BEGIN_PARQUET_CATCH_EXCEPTIONS
    reader_->PreBuffer(row_groups, column_indices, reader_properties_.io_context(),
                       reader_properties_.cache_options());
    END_PARQUET_CATCH_EXCEPTIONS
  }

Here, Pre_Buffer will try to buffer the require RowGroups if neccessary, and memory will not be released until read is finished. It's different from buffering mode( actually buffering mode might decrease the memory usage, lol).

Even when policy is lazy, the reader might not get faster if RowGroup is large enough, and memory will not be released before read is finished. So I wonder if this is ok.

@lidavidm
Copy link
Member

lidavidm commented Oct 5, 2023

Yeah, admittedly pre-buffer was a bit of a hack to minimize the changes to the Parquet reader. Ideally you want the Parquet reader to batch its I/O calls (as pre-buffer does) without necessarily caching them. But from what I remember, the reader is not designed that way (selecting columns eventually leads to a lot of disparate I/O calls far down the stack and you'd have to do a bunch of work to untangle that, hence caching was the easiest; that's also why the cache doesn't dump memory when things are done - it's hard from this level to tell when that time is).

jorisvandenbossche added a commit to jorisvandenbossche/arrow that referenced this issue Oct 5, 2023
jorisvandenbossche added a commit that referenced this issue Oct 6, 2023
…reading Parquet files (#37854)

### Rationale for this change

Enabling `pre_buffer` can give a significant speed-up on filesystems like S3, while it doesn't give noticeable slowdown on local filesystems, based on benchmarks in the issue. Therefore simply enabling it by default seems the best default.

The option was already enabled by default in the `pyarrow.parquet.read_table` interface, this PR aligns the defaults when using `pyarrow.dataset` directly.
* Closes: #36765

Authored-by: Joris Van den Bossche <[email protected]>
Signed-off-by: Joris Van den Bossche <[email protected]>
JerAguilon pushed a commit to JerAguilon/arrow that referenced this issue Oct 23, 2023
…e for reading Parquet files (apache#37854)

### Rationale for this change

Enabling `pre_buffer` can give a significant speed-up on filesystems like S3, while it doesn't give noticeable slowdown on local filesystems, based on benchmarks in the issue. Therefore simply enabling it by default seems the best default.

The option was already enabled by default in the `pyarrow.parquet.read_table` interface, this PR aligns the defaults when using `pyarrow.dataset` directly.
* Closes: apache#36765

Authored-by: Joris Van den Bossche <[email protected]>
Signed-off-by: Joris Van den Bossche <[email protected]>
loicalleyne pushed a commit to loicalleyne/arrow that referenced this issue Nov 13, 2023
…e for reading Parquet files (apache#37854)

### Rationale for this change

Enabling `pre_buffer` can give a significant speed-up on filesystems like S3, while it doesn't give noticeable slowdown on local filesystems, based on benchmarks in the issue. Therefore simply enabling it by default seems the best default.

The option was already enabled by default in the `pyarrow.parquet.read_table` interface, this PR aligns the defaults when using `pyarrow.dataset` directly.
* Closes: apache#36765

Authored-by: Joris Van den Bossche <[email protected]>
Signed-off-by: Joris Van den Bossche <[email protected]>
dgreiss pushed a commit to dgreiss/arrow that referenced this issue Feb 19, 2024
…e for reading Parquet files (apache#37854)

### Rationale for this change

Enabling `pre_buffer` can give a significant speed-up on filesystems like S3, while it doesn't give noticeable slowdown on local filesystems, based on benchmarks in the issue. Therefore simply enabling it by default seems the best default.

The option was already enabled by default in the `pyarrow.parquet.read_table` interface, this PR aligns the defaults when using `pyarrow.dataset` directly.
* Closes: apache#36765

Authored-by: Joris Van den Bossche <[email protected]>
Signed-off-by: Joris Van den Bossche <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment