Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add skiprows and nrows to parquet reader #16214

Merged
merged 30 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
164533e
wip
lithomas1 Jul 5, 2024
3b0427a
revert polars changes
lithomas1 Jul 8, 2024
03c1889
fixes
lithomas1 Jul 8, 2024
bf5e902
rollback changes to chunked parquet reader
lithomas1 Jul 8, 2024
56c88ed
revert changes to parquetreader
lithomas1 Jul 8, 2024
f52b606
raise notimplemented for chunked parquet reader nrows/skiprows
lithomas1 Jul 8, 2024
3eeb95a
fix docs
lithomas1 Jul 8, 2024
0c722da
Merge branch 'branch-24.08' of github.com:rapidsai/cudf into parquet-…
lithomas1 Jul 9, 2024
cc37737
notimplemented for partitioned as well
lithomas1 Jul 9, 2024
5e3037e
Merge branch 'branch-24.08' of github.com:rapidsai/cudf into parquet-…
lithomas1 Jul 9, 2024
917761f
Merge branch 'branch-24.08' of github.com:rapidsai/cudf into parquet-…
lithomas1 Jul 12, 2024
9c6a5da
buggy chunked parquet reader nrows/skiprows
lithomas1 Jul 12, 2024
a78f97f
Merge branch 'branch-24.08' of github.com:rapidsai/cudf into parquet-…
lithomas1 Jul 22, 2024
4f929e5
fix some tests
lithomas1 Jul 22, 2024
3bb52c1
more data
lithomas1 Jul 22, 2024
3900019
Merge branch 'branch-24.08' into parquet-nrows
mhaseeb123 Jul 23, 2024
5055fd0
Merge branch 'branch-24.08' of github.com:rapidsai/cudf into parquet-…
lithomas1 Jul 23, 2024
7bd2438
fix range index metadata processing
lithomas1 Jul 23, 2024
e1982fa
Update python/cudf/cudf/tests/test_parquet.py
lithomas1 Jul 23, 2024
30faf88
update
lithomas1 Jul 23, 2024
d140184
rename params
lithomas1 Jul 29, 2024
2943f74
Merge branch 'branch-24.10' of github.com:rapidsai/cudf into parquet-…
lithomas1 Jul 29, 2024
07411c1
fix typo
lithomas1 Jul 30, 2024
9ce3ceb
Merge branch 'branch-24.10' of github.com:rapidsai/cudf into parquet-…
lithomas1 Jul 30, 2024
18082c7
another missed one
lithomas1 Jul 30, 2024
fec25b0
fix pylibcudf tests
lithomas1 Jul 30, 2024
a2fad68
last fixes
lithomas1 Aug 1, 2024
6447f12
Merge branch 'parquet-nrows' of github.com:lithomas1/cudf into parque…
lithomas1 Aug 1, 2024
9ea93ba
Merge branch 'branch-24.10' into parquet-nrows
lithomas1 Aug 1, 2024
593ccd2
fix cudf-polars
lithomas1 Aug 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 26 additions & 9 deletions python/cudf/cudf/_lib/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ from cudf._lib.utils cimport _data_from_columns, data_from_pylibcudf_io

from cudf._lib.utils import _index_level_name, generate_pandas_metadata

from libc.stdint cimport uint8_t
from libc.stdint cimport int64_t, uint8_t
from libcpp cimport bool
from libcpp.map cimport map
from libcpp.memory cimport make_unique, unique_ptr
Expand Down Expand Up @@ -132,7 +132,10 @@ cdef object _process_metadata(object df,
object filepaths_or_buffers,
list pa_buffers,
bool allow_range_index,
bool use_pandas_metadata):
bool use_pandas_metadata,
size_type num_rows=-1,
int64_t skip_rows=0,
):

add_df_col_struct_names(df, child_names)
index_col = None
Expand Down Expand Up @@ -221,9 +224,13 @@ cdef object _process_metadata(object df,
else:
idx = cudf.Index(cudf.core.column.column_empty(0))
else:
start = range_index_meta["start"] + skip_rows
stop = range_index_meta["stop"]
if num_rows != -1:
stop = start + num_rows
idx = cudf.RangeIndex(
start=range_index_meta['start'],
stop=range_index_meta['stop'],
start=start,
stop=stop,
step=range_index_meta['step'],
name=range_index_meta['name']
)
Expand Down Expand Up @@ -260,7 +267,9 @@ def read_parquet_chunked(
row_groups=None,
use_pandas_metadata=True,
size_t chunk_read_limit=0,
size_t pass_read_limit=1024000000
size_t pass_read_limit=1024000000,
size_type nrows=-1,
int64_t skip_rows=0
):
# Convert NativeFile buffers to NativeFileDatasource,
# but save original buffers in case we need to use
Expand All @@ -287,7 +296,9 @@ def read_parquet_chunked(
row_groups,
use_pandas_metadata,
chunk_read_limit=chunk_read_limit,
pass_read_limit=pass_read_limit
pass_read_limit=pass_read_limit,
skip_rows=skip_rows,
num_rows=nrows,
)

tbl_w_meta = reader.read_chunk()
Expand Down Expand Up @@ -320,13 +331,16 @@ def read_parquet_chunked(
df = _process_metadata(df, column_names, child_names,
per_file_user_data, row_groups,
filepaths_or_buffers, pa_buffers,
allow_range_index, use_pandas_metadata)
allow_range_index, use_pandas_metadata,
num_rows=nrows, skip_rows=skip_rows)
return df


cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None,
use_pandas_metadata=True,
Expression filters=None):
Expression filters=None,
size_type nrows=-1,
int64_t skip_rows=0):
"""
Cython function to call into libcudf API, see `read_parquet`.

Expand Down Expand Up @@ -362,6 +376,8 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None,
filters,
convert_strings_to_categories = False,
use_pandas_metadata = use_pandas_metadata,
skip_rows = skip_rows,
num_rows = nrows,
Copy link
Member

@mhaseeb123 mhaseeb123 Jul 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's be consistent with either num_rows or nrows across the files. @galipremsagar I can't find the same option in pyarrow.read_table or pd.read_parquet so I am sure what should be preferred here. If arbitrary, my vote would be num_rows to be consistent with C++ counterpart but not a blocker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah not sure which is better.

nrows would be consistent with read_csv, and num_rows would be consistent with libcudf.

Copy link
Member

@mhaseeb123 mhaseeb123 Jul 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's go with nrows then and further the PR to merge! 🙂

)

df = cudf.DataFrame._from_data(
Expand All @@ -371,7 +387,8 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None,
df = _process_metadata(df, tbl_w_meta.column_names(include_children=False),
tbl_w_meta.child_names, tbl_w_meta.per_file_user_data,
row_groups, filepaths_or_buffers, pa_buffers,
allow_range_index, use_pandas_metadata)
allow_range_index, use_pandas_metadata,
num_rows=nrows, skip_rows=skip_rows)
return df

cpdef read_parquet_metadata(filepaths_or_buffers):
Expand Down
23 changes: 23 additions & 0 deletions python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,8 @@ def read_parquet(
open_file_options=None,
bytes_per_thread=None,
dataset_kwargs=None,
nrows=None,
skip_rows=None,
*args,
**kwargs,
):
Expand Down Expand Up @@ -685,6 +687,8 @@ def read_parquet(
partition_keys=partition_keys,
partition_categories=partition_categories,
dataset_kwargs=dataset_kwargs,
nrows=nrows,
skip_rows=skip_rows,
**kwargs,
)
# Apply filters row-wise (if any are defined), and return
Expand Down Expand Up @@ -813,18 +817,27 @@ def _parquet_to_frame(
partition_keys=None,
partition_categories=None,
dataset_kwargs=None,
nrows=None,
skip_rows=None,
**kwargs,
):
# If this is not a partitioned read, only need
# one call to `_read_parquet`
if not partition_keys:
return _read_parquet(
paths_or_buffers,
nrows=nrows,
skip_rows=skip_rows,
*args,
row_groups=row_groups,
**kwargs,
)

if nrows is not None or skip_rows is not None:
raise NotImplementedError(
"nrows/skip_rows is not supported when reading a partitioned parquet dataset"
)

partition_meta = None
partitioning = (dataset_kwargs or {}).get("partitioning", None)
if hasattr(partitioning, "schema"):
Expand Down Expand Up @@ -912,6 +925,8 @@ def _read_parquet(
columns=None,
row_groups=None,
use_pandas_metadata=None,
nrows=None,
skip_rows=None,
*args,
**kwargs,
):
Expand All @@ -934,13 +949,21 @@ def _read_parquet(
columns=columns,
row_groups=row_groups,
use_pandas_metadata=use_pandas_metadata,
nrows=nrows if nrows is not None else -1,
skip_rows=skip_rows if skip_rows is not None else 0,
)
else:
if nrows is None:
nrows = -1
if skip_rows is None:
skip_rows = 0
return libparquet.read_parquet(
filepaths_or_buffers,
columns=columns,
row_groups=row_groups,
use_pandas_metadata=use_pandas_metadata,
nrows=nrows,
skip_rows=skip_rows,
)
else:
if (
Expand Down
41 changes: 41 additions & 0 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -1978,6 +1978,26 @@ def test_parquet_partitioned(tmpdir_factory, cols, filename):
assert fn == filename


@pytest.mark.parametrize("kwargs", [{"nrows": 1}, {"skip_rows": 1}])
def test_parquet_partitioned_notimplemented(tmpdir_factory, kwargs):
# Checks that write_to_dataset is wrapping to_parquet
# as expected
pdf_dir = str(tmpdir_factory.mktemp("pdf_dir"))
size = 100
pdf = pd.DataFrame(
{
"a": np.arange(0, stop=size, dtype="int64"),
"b": np.random.choice(list("abcd"), size=size),
"c": np.random.choice(np.arange(4), size=size),
}
)
pdf.to_parquet(pdf_dir, index=False, partition_cols=["b"])

# Check that cudf and pd return the same read
with pytest.raises(NotImplementedError):
cudf.read_parquet(pdf_dir, **kwargs)
lithomas1 marked this conversation as resolved.
Show resolved Hide resolved


@pytest.mark.parametrize("return_meta", [True, False])
def test_parquet_writer_chunked_partitioned(tmpdir_factory, return_meta):
pdf_dir = str(tmpdir_factory.mktemp("pdf_dir"))
Expand Down Expand Up @@ -3768,6 +3788,27 @@ def test_parquet_chunked_reader(
assert_eq(expected, actual)


@pytest.mark.parametrize(
"nrows,skip_rows",
[
(0, 0),
(1000, 0),
(0, 1000),
(1000, 10000),
mhaseeb123 marked this conversation as resolved.
Show resolved Hide resolved
],
)
def test_parquet_reader_nrows_skiprows(nrows, skip_rows):
df = pd.DataFrame(
{"a": [1, 2, 3, 4] * 100000, "b": ["av", "qw", "hi", "xyz"] * 100000}
)
expected = df[skip_rows : skip_rows + nrows]
buffer = BytesIO()
df.to_parquet(buffer)
got = cudf.read_parquet(buffer, nrows=nrows, skip_rows=skip_rows)
# Check results
assert_eq(expected, got)
lithomas1 marked this conversation as resolved.
Show resolved Hide resolved


def test_parquet_reader_pandas_compatibility():
df = pd.DataFrame(
{"a": [1, 2, 3, 4] * 10000, "b": ["av", "qw", "hi", "xyz"] * 10000}
Expand Down
10 changes: 10 additions & 0 deletions python/cudf/cudf/utils/ioutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,16 @@
in parallel (using a python thread pool). Default allocation is
{bytes_per_thread} bytes.
This parameter is functional only when `use_python_file_object=False`.
skiprows : int, default None
If not None, the number of rows to skip from the start of the file.

.. note::
This option is not supported when the low-memory mode is on.
wence- marked this conversation as resolved.
Show resolved Hide resolved
nrows : int, default None
If not None, the total number of rows to read.

.. note:
This option is not supported when the low-memory mode is on.

Returns
-------
Expand Down
Loading