Skip to content

Commit

Permalink
expose file size to python dataset
Browse files Browse the repository at this point in the history
  • Loading branch information
eeroel committed Sep 26, 2023
1 parent 772a01c commit 6e183ba
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 12 deletions.
6 changes: 5 additions & 1 deletion cpp/src/arrow/dataset/file_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ class ARROW_DS_EXPORT FileSource : public util::EqualityComparable<FileSource> {
: file_info_(std::move(path)),
filesystem_(std::move(filesystem)),
compression_(compression) {}

FileSource(std::string path, int64_t size, std::shared_ptr<fs::FileSystem> filesystem,
Compression::type compression = Compression::UNCOMPRESSED)
: file_info_(std::move(path), std::move(size)),
filesystem_(std::move(filesystem)),
compression_(compression) {}
FileSource(fs::FileInfo info, std::shared_ptr<fs::FileSystem> filesystem,
Compression::type compression = Compression::UNCOMPRESSED)
: file_info_(std::move(info)),
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/filesystem/filesystem.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ struct ARROW_EXPORT FileInfo : public util::EqualityComparable<FileInfo> {

explicit FileInfo(std::string path, FileType type = FileType::Unknown)
: path_(std::move(path)), type_(type) {}
explicit FileInfo(std::string path, int64_t size, FileType type = FileType::Unknown)
: path_(std::move(path)), type_(type), size_(size) {}

/// The file type
FileType type() const { return type_; }
Expand Down
2 changes: 1 addition & 1 deletion python/pyarrow/_dataset.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ from pyarrow.lib cimport *
from pyarrow._fs cimport FileSystem


cdef CFileSource _make_file_source(object file, FileSystem filesystem=*)
cdef CFileSource _make_file_source(object file, FileSystem filesystem=*, int size=*)


cdef class DatasetFactory(_Weakrefable):
Expand Down
17 changes: 10 additions & 7 deletions python/pyarrow/_dataset.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def _get_parquet_symbol(name):
return _dataset_pq and getattr(_dataset_pq, name)


cdef CFileSource _make_file_source(object file, FileSystem filesystem=None):
cdef CFileSource _make_file_source(object file, FileSystem filesystem=None, int size=-1):

cdef:
CFileSource c_source
Expand All @@ -108,14 +108,14 @@ cdef CFileSource _make_file_source(object file, FileSystem filesystem=None):
if isinstance(file, Buffer):
c_buffer = pyarrow_unwrap_buffer(file)
c_source = CFileSource(move(c_buffer))

elif _is_path_like(file):
if filesystem is None:
raise ValueError("cannot construct a FileSource from "
"a path without a FileSystem")
c_filesystem = filesystem.unwrap()
c_path = tobytes(_stringify_path(file))
c_source = CFileSource(move(c_path), move(c_filesystem))
c_size = size
c_source = CFileSource(move(c_path), move(c_size), move(c_filesystem))

elif hasattr(file, 'read'):
# Optimistically hope this is file-like
Expand Down Expand Up @@ -1230,15 +1230,16 @@ cdef class FileFormat(_Weakrefable):
The schema inferred from the file
"""
cdef:
CFileSource c_source = _make_file_source(file, filesystem)
CFileSource c_source = _make_file_source(file, filesystem=filesystem, size=-1)
CResult[shared_ptr[CSchema]] c_result
with nogil:
c_result = self.format.Inspect(c_source)
c_schema = GetResultValue(c_result)
return pyarrow_wrap_schema(move(c_schema))

def make_fragment(self, file, filesystem=None,
Expression partition_expression=None):
Expression partition_expression=None,
size=-1):
"""
Make a FileFragment from a given file.
Expand All @@ -1252,6 +1253,9 @@ cdef class FileFormat(_Weakrefable):
partition_expression : Expression, optional
An expression that is guaranteed true for all rows in the fragment. Allows
fragment to be potentially skipped while scanning with a filter.
size : int, optional
The size of the file in bytes. Can improve performance with high-latency filesystems
when file size needs to be known before reading.
Returns
-------
Expand All @@ -1260,8 +1264,7 @@ cdef class FileFormat(_Weakrefable):
"""
if partition_expression is None:
partition_expression = _true

c_source = _make_file_source(file, filesystem)
c_source = _make_file_source(file, filesystem=filesystem, size=size)
c_fragment = <shared_ptr[CFragment]> GetResultValue(
self.format.MakeFragment(move(c_source),
partition_expression.unwrap(),
Expand Down
9 changes: 6 additions & 3 deletions python/pyarrow/_dataset_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ cdef class ParquetFileFormat(FileFormat):
return f"<ParquetFileFormat read_options={self.read_options}>"

def make_fragment(self, file, filesystem=None,
Expression partition_expression=None, row_groups=None):
Expression partition_expression=None, row_groups=None, size=-1):
"""
Make a FileFragment from a given file.
Expand All @@ -242,6 +242,9 @@ cdef class ParquetFileFormat(FileFormat):
fragment to be potentially skipped while scanning with a filter.
row_groups : Iterable, optional
The indices of the row groups to include
size : int, optional
The size of the file in bytes. Can improve performance with high-latency filesystems
when file size needs to be known before reading.
Returns
-------
Expand All @@ -256,9 +259,9 @@ cdef class ParquetFileFormat(FileFormat):

if row_groups is None:
return super().make_fragment(file, filesystem,
partition_expression)
partition_expression, size)

c_source = _make_file_source(file, filesystem)
c_source = _make_file_source(file, filesystem, size=size)
c_row_groups = [<int> row_group for row_group in set(row_groups)]

c_fragment = <shared_ptr[CFragment]> GetResultValue(
Expand Down
1 change: 1 addition & 0 deletions python/pyarrow/includes/libarrow_dataset.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
const c_string& path() const
const shared_ptr[CFileSystem]& filesystem() const
const shared_ptr[CBuffer]& buffer() const
const int size() const
# HACK: Cython can't handle all the overloads so don't declare them.
# This means invalid construction of CFileSource won't be caught in
# the C++ generation phase (though it will still be caught when
Expand Down
42 changes: 42 additions & 0 deletions python/pyarrow/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -981,6 +981,48 @@ def test_make_fragment(multisourcefs):
assert row_group_fragment.row_groups == [0]


@pytest.mark.parquet
def test_make_fragment_with_size(s3_example_simple):
table, path, fs, uri, host, port, access_key, secret_key = s3_example_simple

file_format = ds.ParquetFileFormat()
paths = ['mybucket/data.parquet']

fragments = [file_format.make_fragment(path, fs)
for path in paths]

dataset = ds.FileSystemDataset(
fragments, format=file_format, schema=table.schema,
filesystem = fs
)

tbl = dataset.to_table()
assert tbl.equals(table)

sizes_toosmall = [1]
fragments_with_size = [file_format.make_fragment(path, fs, size=size)
for path, size in zip(paths, sizes_toosmall)]

dataset_with_size = ds.FileSystemDataset(
fragments_with_size, format=file_format, schema=table.schema,
filesystem = fs
)

with pytest.raises(pyarrow.lib.ArrowInvalid, match='Parquet file size is 1 bytes'):
table = dataset_with_size.to_table()

sizes_toolarge = [1000000]
fragments_with_size = [file_format.make_fragment(path, fs, size=size)
for path, size in zip(paths, sizes_toolarge)]

dataset_with_size = ds.FileSystemDataset(
fragments_with_size, format=file_format, schema=table.schema,
filesystem = fs
)

with pytest.raises(OSError, match='ExceptionName: InvalidRange'):
table = dataset_with_size.to_table()

def test_make_csv_fragment_from_buffer(dataset_reader, pickle_module):
content = textwrap.dedent("""
alpha,num,animal
Expand Down

0 comments on commit 6e183ba

Please sign in to comment.