Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-37857: [Python][Dataset] Expose file size to python dataset #37868

Merged
merged 13 commits into from
Dec 5, 2023
5 changes: 2 additions & 3 deletions python/pyarrow/_dataset.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow_dataset cimport *
from pyarrow.lib cimport *
from pyarrow._fs cimport FileSystem
from pyarrow._fs cimport FileSystem, FileInfo


cdef CFileSource _make_file_source(object file, FileSystem filesystem=*)

cdef CFileSource _make_file_source(object file, FileSystem filesystem=*, object file_size=*)

cdef class DatasetFactory(_Weakrefable):

Expand Down
25 changes: 17 additions & 8 deletions python/pyarrow/_dataset.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ from pyarrow.includes.libarrow_dataset cimport *
from pyarrow._acero cimport ExecNodeOptions
from pyarrow._compute cimport Expression, _bind
from pyarrow._compute import _forbid_instantiation
from pyarrow._fs cimport FileSystem, FileSelector
from pyarrow._fs cimport FileSystem, FileSelector, FileInfo
from pyarrow._csv cimport (
ConvertOptions, ParseOptions, ReadOptions, WriteOptions)
from pyarrow.util import _is_iterable, _is_path_like, _stringify_path
Expand Down Expand Up @@ -96,27 +96,33 @@ def _get_parquet_symbol(name):
return _dataset_pq and getattr(_dataset_pq, name)


cdef CFileSource _make_file_source(object file, FileSystem filesystem=None):
cdef CFileSource _make_file_source(object file, FileSystem filesystem=None, object file_size=None):

cdef:
CFileSource c_source
shared_ptr[CFileSystem] c_filesystem
CFileInfo c_info
c_string c_path
shared_ptr[CRandomAccessFile] c_file
shared_ptr[CBuffer] c_buffer
int64_t c_size

if isinstance(file, Buffer):
c_buffer = pyarrow_unwrap_buffer(file)
c_source = CFileSource(move(c_buffer))

elif _is_path_like(file):
if filesystem is None:
raise ValueError("cannot construct a FileSource from "
"a path without a FileSystem")
c_filesystem = filesystem.unwrap()
c_path = tobytes(_stringify_path(file))
c_source = CFileSource(move(c_path), move(c_filesystem))

if file_size is not None:
c_size = file_size
c_info = FileInfo(c_path, size=c_size).unwrap()
c_source = CFileSource(move(c_info), move(c_filesystem))
else:
c_source = CFileSource(move(c_path), move(c_filesystem))
elif hasattr(file, 'read'):
# Optimistically hope this is file-like
c_file = get_native_file(file, False).get_random_access_file()
Expand Down Expand Up @@ -1230,15 +1236,16 @@ cdef class FileFormat(_Weakrefable):
The schema inferred from the file
"""
cdef:
CFileSource c_source = _make_file_source(file, filesystem)
CFileSource c_source = _make_file_source(file, filesystem, file_size=None)
CResult[shared_ptr[CSchema]] c_result
with nogil:
c_result = self.format.Inspect(c_source)
c_schema = GetResultValue(c_result)
return pyarrow_wrap_schema(move(c_schema))

def make_fragment(self, file, filesystem=None,
Expression partition_expression=None):
Expression partition_expression=None,
*, file_size=None):
"""
Make a FileFragment from a given file.

Expand All @@ -1252,6 +1259,9 @@ cdef class FileFormat(_Weakrefable):
partition_expression : Expression, optional
An expression that is guaranteed true for all rows in the fragment. Allows
fragment to be potentially skipped while scanning with a filter.
file_size : int, optional
The size of the file in bytes. Can improve performance with high-latency filesystems
when file size needs to be known before reading.

Returns
-------
Expand All @@ -1260,8 +1270,7 @@ cdef class FileFormat(_Weakrefable):
"""
if partition_expression is None:
partition_expression = _true

c_source = _make_file_source(file, filesystem)
c_source = _make_file_source(file, filesystem, file_size)
c_fragment = <shared_ptr[CFragment]> GetResultValue(
self.format.MakeFragment(move(c_source),
partition_expression.unwrap(),
Expand Down
11 changes: 6 additions & 5 deletions python/pyarrow/_dataset_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ cdef class ParquetFileFormat(FileFormat):
return f"<ParquetFileFormat read_options={self.read_options}>"

def make_fragment(self, file, filesystem=None,
Expression partition_expression=None, row_groups=None):
Expression partition_expression=None, row_groups=None, *, file_size=None):
"""
Make a FileFragment from a given file.

Expand All @@ -251,6 +251,9 @@ cdef class ParquetFileFormat(FileFormat):
fragment to be potentially skipped while scanning with a filter.
row_groups : Iterable, optional
The indices of the row groups to include
file_size : int, optional
The size of the file in bytes. Can improve performance with high-latency filesystems
when file size needs to be known before reading.

Returns
-------
Expand All @@ -259,15 +262,13 @@ cdef class ParquetFileFormat(FileFormat):
"""
cdef:
vector[int] c_row_groups

if partition_expression is None:
partition_expression = _true

if row_groups is None:
return super().make_fragment(file, filesystem,
partition_expression)
partition_expression, file_size=file_size)

c_source = _make_file_source(file, filesystem)
c_source = _make_file_source(file, filesystem, file_size)
c_row_groups = [<int> row_group for row_group in set(row_groups)]

c_fragment = <shared_ptr[CFragment]> GetResultValue(
Expand Down
1 change: 1 addition & 0 deletions python/pyarrow/includes/libarrow_dataset.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
const c_string& path() const
const shared_ptr[CFileSystem]& filesystem() const
const shared_ptr[CBuffer]& buffer() const
const int64_t size() const
# HACK: Cython can't handle all the overloads so don't declare them.
# This means invalid construction of CFileSource won't be caught in
# the C++ generation phase (though it will still be caught when
Expand Down
58 changes: 58 additions & 0 deletions python/pyarrow/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -981,6 +981,64 @@ def test_make_fragment(multisourcefs):
assert row_group_fragment.row_groups == [0]


@pytest.mark.parquet
@pytest.mark.s3
def test_make_fragment_with_size(s3_example_simple):
bkietz marked this conversation as resolved.
Show resolved Hide resolved
"""
Test passing file_size to make_fragment. Not all FS implementations make use
of the file size (by implementing an OpenInputFile that takes a FileInfo), but
s3 does, which is why it's used here.
"""
table, path, fs, uri, host, port, access_key, secret_key = s3_example_simple

file_format = ds.ParquetFileFormat()
paths = [path]

fragments = [file_format.make_fragment(path, fs)
for path in paths]
dataset = ds.FileSystemDataset(
fragments, format=file_format, schema=table.schema, filesystem=fs
)

tbl = dataset.to_table()
assert tbl.equals(table)

# true sizes -> works
sizes_true = [dataset.filesystem.get_file_info(x).size for x in dataset.files]
fragments_with_size = [file_format.make_fragment(path, fs, file_size=size)
for path, size in zip(paths, sizes_true)]
dataset_with_size = ds.FileSystemDataset(
fragments_with_size, format=file_format, schema=table.schema, filesystem=fs
)
tbl = dataset.to_table()
assert tbl.equals(table)

# too small sizes -> error
sizes_toosmall = [1 for path in paths]
fragments_with_size = [file_format.make_fragment(path, fs, file_size=size)
for path, size in zip(paths, sizes_toosmall)]

dataset_with_size = ds.FileSystemDataset(
fragments_with_size, format=file_format, schema=table.schema, filesystem=fs
)

with pytest.raises(pyarrow.lib.ArrowInvalid, match='Parquet file size is 1 bytes'):
table = dataset_with_size.to_table()

# too large sizes -> error
sizes_toolarge = [1000000 for path in paths]
fragments_with_size = [file_format.make_fragment(path, fs, file_size=size)
for path, size in zip(paths, sizes_toolarge)]

dataset_with_size = ds.FileSystemDataset(
fragments_with_size, format=file_format, schema=table.schema, filesystem=fs
)

# invalid range
with pytest.raises(OSError, match='HTTP status 416'):
pitrou marked this conversation as resolved.
Show resolved Hide resolved
table = dataset_with_size.to_table()


def test_make_csv_fragment_from_buffer(dataset_reader, pickle_module):
content = textwrap.dedent("""
alpha,num,animal
Expand Down
Loading