Skip to content

Commit

Permalink
apacheGH-37857: [Python][Dataset] Expose file size to python dataset (a…
Browse files Browse the repository at this point in the history
…pache#37868)

### Rationale for this change

Allow passing known file sizes to `make_fragment`, to avoid potential network requests.

### What changes are included in this PR?

### Are these changes tested?

Yes, tests with S3 that file size gets used.

### Are there any user-facing changes?

Yes, new function arguments.

* Closes: apache#37857

Lead-authored-by: Eero Lihavainen <[email protected]>
Co-authored-by: Benjamin Kietzman <[email protected]>
Co-authored-by: Eero Lihavainen <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
2 people authored and dgreiss committed Feb 17, 2024
1 parent fc87444 commit 072a752
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 16 deletions.
5 changes: 2 additions & 3 deletions python/pyarrow/_dataset.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow_dataset cimport *
from pyarrow.lib cimport *
from pyarrow._fs cimport FileSystem
from pyarrow._fs cimport FileSystem, FileInfo


cdef CFileSource _make_file_source(object file, FileSystem filesystem=*)

cdef CFileSource _make_file_source(object file, FileSystem filesystem=*, object file_size=*)

cdef class DatasetFactory(_Weakrefable):

Expand Down
25 changes: 17 additions & 8 deletions python/pyarrow/_dataset.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ from pyarrow.includes.libarrow_dataset cimport *
from pyarrow._acero cimport ExecNodeOptions
from pyarrow._compute cimport Expression, _bind
from pyarrow._compute import _forbid_instantiation
from pyarrow._fs cimport FileSystem, FileSelector
from pyarrow._fs cimport FileSystem, FileSelector, FileInfo
from pyarrow._csv cimport (
ConvertOptions, ParseOptions, ReadOptions, WriteOptions)
from pyarrow.util import _is_iterable, _is_path_like, _stringify_path
Expand Down Expand Up @@ -96,27 +96,33 @@ def _get_parquet_symbol(name):
return _dataset_pq and getattr(_dataset_pq, name)


cdef CFileSource _make_file_source(object file, FileSystem filesystem=None):
cdef CFileSource _make_file_source(object file, FileSystem filesystem=None, object file_size=None):

cdef:
CFileSource c_source
shared_ptr[CFileSystem] c_filesystem
CFileInfo c_info
c_string c_path
shared_ptr[CRandomAccessFile] c_file
shared_ptr[CBuffer] c_buffer
int64_t c_size

if isinstance(file, Buffer):
c_buffer = pyarrow_unwrap_buffer(file)
c_source = CFileSource(move(c_buffer))

elif _is_path_like(file):
if filesystem is None:
raise ValueError("cannot construct a FileSource from "
"a path without a FileSystem")
c_filesystem = filesystem.unwrap()
c_path = tobytes(_stringify_path(file))
c_source = CFileSource(move(c_path), move(c_filesystem))

if file_size is not None:
c_size = file_size
c_info = FileInfo(c_path, size=c_size).unwrap()
c_source = CFileSource(move(c_info), move(c_filesystem))
else:
c_source = CFileSource(move(c_path), move(c_filesystem))
elif hasattr(file, 'read'):
# Optimistically hope this is file-like
c_file = get_native_file(file, False).get_random_access_file()
Expand Down Expand Up @@ -1230,15 +1236,16 @@ cdef class FileFormat(_Weakrefable):
The schema inferred from the file
"""
cdef:
CFileSource c_source = _make_file_source(file, filesystem)
CFileSource c_source = _make_file_source(file, filesystem, file_size=None)
CResult[shared_ptr[CSchema]] c_result
with nogil:
c_result = self.format.Inspect(c_source)
c_schema = GetResultValue(c_result)
return pyarrow_wrap_schema(move(c_schema))

def make_fragment(self, file, filesystem=None,
Expression partition_expression=None):
Expression partition_expression=None,
*, file_size=None):
"""
Make a FileFragment from a given file.
Expand All @@ -1252,6 +1259,9 @@ cdef class FileFormat(_Weakrefable):
partition_expression : Expression, optional
An expression that is guaranteed true for all rows in the fragment. Allows
fragment to be potentially skipped while scanning with a filter.
file_size : int, optional
The size of the file in bytes. Can improve performance with high-latency filesystems
when file size needs to be known before reading.
Returns
-------
Expand All @@ -1260,8 +1270,7 @@ cdef class FileFormat(_Weakrefable):
"""
if partition_expression is None:
partition_expression = _true

c_source = _make_file_source(file, filesystem)
c_source = _make_file_source(file, filesystem, file_size)
c_fragment = <shared_ptr[CFragment]> GetResultValue(
self.format.MakeFragment(move(c_source),
partition_expression.unwrap(),
Expand Down
11 changes: 6 additions & 5 deletions python/pyarrow/_dataset_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ cdef class ParquetFileFormat(FileFormat):
return f"<ParquetFileFormat read_options={self.read_options}>"

def make_fragment(self, file, filesystem=None,
Expression partition_expression=None, row_groups=None):
Expression partition_expression=None, row_groups=None, *, file_size=None):
"""
Make a FileFragment from a given file.
Expand All @@ -251,6 +251,9 @@ cdef class ParquetFileFormat(FileFormat):
fragment to be potentially skipped while scanning with a filter.
row_groups : Iterable, optional
The indices of the row groups to include
file_size : int, optional
The size of the file in bytes. Can improve performance with high-latency filesystems
when file size needs to be known before reading.
Returns
-------
Expand All @@ -259,15 +262,13 @@ cdef class ParquetFileFormat(FileFormat):
"""
cdef:
vector[int] c_row_groups

if partition_expression is None:
partition_expression = _true

if row_groups is None:
return super().make_fragment(file, filesystem,
partition_expression)
partition_expression, file_size=file_size)

c_source = _make_file_source(file, filesystem)
c_source = _make_file_source(file, filesystem, file_size)
c_row_groups = [<int> row_group for row_group in set(row_groups)]

c_fragment = <shared_ptr[CFragment]> GetResultValue(
Expand Down
1 change: 1 addition & 0 deletions python/pyarrow/includes/libarrow_dataset.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
const c_string& path() const
const shared_ptr[CFileSystem]& filesystem() const
const shared_ptr[CBuffer]& buffer() const
const int64_t size() const
# HACK: Cython can't handle all the overloads so don't declare them.
# This means invalid construction of CFileSource won't be caught in
# the C++ generation phase (though it will still be caught when
Expand Down
58 changes: 58 additions & 0 deletions python/pyarrow/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,64 @@ def test_make_fragment(multisourcefs):
assert row_group_fragment.row_groups == [0]


@pytest.mark.parquet
@pytest.mark.s3
def test_make_fragment_with_size(s3_example_simple):
"""
Test passing file_size to make_fragment. Not all FS implementations make use
of the file size (by implementing an OpenInputFile that takes a FileInfo), but
s3 does, which is why it's used here.
"""
table, path, fs, uri, host, port, access_key, secret_key = s3_example_simple

file_format = ds.ParquetFileFormat()
paths = [path]

fragments = [file_format.make_fragment(path, fs)
for path in paths]
dataset = ds.FileSystemDataset(
fragments, format=file_format, schema=table.schema, filesystem=fs
)

tbl = dataset.to_table()
assert tbl.equals(table)

# true sizes -> works
sizes_true = [dataset.filesystem.get_file_info(x).size for x in dataset.files]
fragments_with_size = [file_format.make_fragment(path, fs, file_size=size)
for path, size in zip(paths, sizes_true)]
dataset_with_size = ds.FileSystemDataset(
fragments_with_size, format=file_format, schema=table.schema, filesystem=fs
)
tbl = dataset.to_table()
assert tbl.equals(table)

# too small sizes -> error
sizes_toosmall = [1 for path in paths]
fragments_with_size = [file_format.make_fragment(path, fs, file_size=size)
for path, size in zip(paths, sizes_toosmall)]

dataset_with_size = ds.FileSystemDataset(
fragments_with_size, format=file_format, schema=table.schema, filesystem=fs
)

with pytest.raises(pyarrow.lib.ArrowInvalid, match='Parquet file size is 1 bytes'):
table = dataset_with_size.to_table()

# too large sizes -> error
sizes_toolarge = [1000000 for path in paths]
fragments_with_size = [file_format.make_fragment(path, fs, file_size=size)
for path, size in zip(paths, sizes_toolarge)]

dataset_with_size = ds.FileSystemDataset(
fragments_with_size, format=file_format, schema=table.schema, filesystem=fs
)

# invalid range
with pytest.raises(OSError, match='HTTP status 416'):
table = dataset_with_size.to_table()


def test_make_csv_fragment_from_buffer(dataset_reader, pickle_module):
content = textwrap.dedent("""
alpha,num,animal
Expand Down

0 comments on commit 072a752

Please sign in to comment.