Skip to content

Commit

Permalink
Add skiprows and nrows to parquet reader (#16214)
Browse files Browse the repository at this point in the history
closes #15144

Authors:
  - Thomas Li (https://github.com/lithomas1)
  - Muhammad Haseeb (https://github.com/mhaseeb123)

Approvers:
  - Muhammad Haseeb (https://github.com/mhaseeb123)
  - Lawrence Mitchell (https://github.com/wence-)

URL: #16214
  • Loading branch information
lithomas1 authored Aug 1, 2024
1 parent 211dbe4 commit 9d0c57a
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 21 deletions.
35 changes: 26 additions & 9 deletions python/cudf/cudf/_lib/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ from cudf._lib.utils cimport _data_from_columns, data_from_pylibcudf_io

from cudf._lib.utils import _index_level_name, generate_pandas_metadata

from libc.stdint cimport uint8_t
from libc.stdint cimport int64_t, uint8_t
from libcpp cimport bool
from libcpp.map cimport map
from libcpp.memory cimport make_unique, unique_ptr
Expand Down Expand Up @@ -132,7 +132,10 @@ cdef object _process_metadata(object df,
object filepaths_or_buffers,
list pa_buffers,
bool allow_range_index,
bool use_pandas_metadata):
bool use_pandas_metadata,
size_type nrows=-1,
int64_t skip_rows=0,
):

add_df_col_struct_names(df, child_names)
index_col = None
Expand Down Expand Up @@ -221,9 +224,13 @@ cdef object _process_metadata(object df,
else:
idx = cudf.Index(cudf.core.column.column_empty(0))
else:
start = range_index_meta["start"] + skip_rows
stop = range_index_meta["stop"]
if nrows != -1:
stop = start + nrows
idx = cudf.RangeIndex(
start=range_index_meta['start'],
stop=range_index_meta['stop'],
start=start,
stop=stop,
step=range_index_meta['step'],
name=range_index_meta['name']
)
Expand Down Expand Up @@ -260,7 +267,9 @@ def read_parquet_chunked(
row_groups=None,
use_pandas_metadata=True,
size_t chunk_read_limit=0,
size_t pass_read_limit=1024000000
size_t pass_read_limit=1024000000,
size_type nrows=-1,
int64_t skip_rows=0
):
# Convert NativeFile buffers to NativeFileDatasource,
# but save original buffers in case we need to use
Expand All @@ -287,7 +296,9 @@ def read_parquet_chunked(
row_groups,
use_pandas_metadata,
chunk_read_limit=chunk_read_limit,
pass_read_limit=pass_read_limit
pass_read_limit=pass_read_limit,
skip_rows=skip_rows,
nrows=nrows,
)

tbl_w_meta = reader.read_chunk()
Expand Down Expand Up @@ -320,13 +331,16 @@ def read_parquet_chunked(
df = _process_metadata(df, column_names, child_names,
per_file_user_data, row_groups,
filepaths_or_buffers, pa_buffers,
allow_range_index, use_pandas_metadata)
allow_range_index, use_pandas_metadata,
nrows=nrows, skip_rows=skip_rows)
return df


cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None,
use_pandas_metadata=True,
Expression filters=None):
Expression filters=None,
size_type nrows=-1,
int64_t skip_rows=0):
"""
Cython function to call into libcudf API, see `read_parquet`.
Expand Down Expand Up @@ -362,6 +376,8 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None,
filters,
convert_strings_to_categories = False,
use_pandas_metadata = use_pandas_metadata,
skip_rows = skip_rows,
nrows = nrows,
)

df = cudf.DataFrame._from_data(
Expand All @@ -371,7 +387,8 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None,
df = _process_metadata(df, tbl_w_meta.column_names(include_children=False),
tbl_w_meta.child_names, tbl_w_meta.per_file_user_data,
row_groups, filepaths_or_buffers, pa_buffers,
allow_range_index, use_pandas_metadata)
allow_range_index, use_pandas_metadata,
nrows=nrows, skip_rows=skip_rows)
return df

cpdef read_parquet_metadata(filepaths_or_buffers):
Expand Down
2 changes: 1 addition & 1 deletion python/cudf/cudf/_lib/pylibcudf/io/parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ cpdef read_parquet(
bool convert_strings_to_categories = *,
bool use_pandas_metadata = *,
int64_t skip_rows = *,
size_type num_rows = *,
size_type nrows = *,
# disabled see comment in parquet.pyx for more
# ReaderColumnSchema reader_column_schema = *,
# DataType timestamp_type = *
Expand Down
18 changes: 9 additions & 9 deletions python/cudf/cudf/_lib/pylibcudf/io/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ cdef parquet_reader_options _setup_parquet_reader_options(
bool convert_strings_to_categories = False,
bool use_pandas_metadata = True,
int64_t skip_rows = 0,
size_type num_rows = -1,
size_type nrows = -1,
# ReaderColumnSchema reader_column_schema = None,
# DataType timestamp_type = DataType(type_id.EMPTY)
):
Expand All @@ -40,8 +40,8 @@ cdef parquet_reader_options _setup_parquet_reader_options(
)
if row_groups is not None:
opts.set_row_groups(row_groups)
if num_rows != -1:
opts.set_num_rows(num_rows)
if nrows != -1:
opts.set_num_rows(nrows)
if skip_rows != 0:
opts.set_skip_rows(skip_rows)
if columns is not None:
Expand Down Expand Up @@ -73,7 +73,7 @@ cdef class ChunkedParquetReader:
Whether to convert string columns to the category type
skip_rows : int64_t, default 0
The number of rows to skip from the start of the file.
num_rows : size_type, default -1
nrows : size_type, default -1
The number of rows to read. By default, read the entire file.
chunk_read_limit : size_t, default 0
Limit on total number of bytes to be returned per read,
Expand All @@ -90,7 +90,7 @@ cdef class ChunkedParquetReader:
bool use_pandas_metadata=True,
bool convert_strings_to_categories=False,
int64_t skip_rows = 0,
size_type num_rows = -1,
size_type nrows = -1,
size_t chunk_read_limit=0,
size_t pass_read_limit=1024000000
):
Expand All @@ -103,7 +103,7 @@ cdef class ChunkedParquetReader:
convert_strings_to_categories=convert_strings_to_categories,
use_pandas_metadata=use_pandas_metadata,
skip_rows=skip_rows,
num_rows=num_rows,
nrows=nrows,
)

with nogil:
Expand Down Expand Up @@ -152,7 +152,7 @@ cpdef read_parquet(
bool convert_strings_to_categories = False,
bool use_pandas_metadata = True,
int64_t skip_rows = 0,
size_type num_rows = -1,
size_type nrows = -1,
# Disabled, these aren't used by cudf-python
# we should only add them back in if there's user demand
# ReaderColumnSchema reader_column_schema = None,
Expand All @@ -178,7 +178,7 @@ cpdef read_parquet(
the per-file user metadata of the ``TableWithMetadata``
skip_rows : int64_t, default 0
The number of rows to skip from the start of the file.
num_rows : size_type, default -1
nrows : size_type, default -1
The number of rows to read. By default, read the entire file.
Returns
Expand All @@ -195,7 +195,7 @@ cpdef read_parquet(
convert_strings_to_categories,
use_pandas_metadata,
skip_rows,
num_rows,
nrows,
)

with nogil:
Expand Down
23 changes: 23 additions & 0 deletions python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,8 @@ def read_parquet(
open_file_options=None,
bytes_per_thread=None,
dataset_kwargs=None,
nrows=None,
skip_rows=None,
*args,
**kwargs,
):
Expand Down Expand Up @@ -685,6 +687,8 @@ def read_parquet(
partition_keys=partition_keys,
partition_categories=partition_categories,
dataset_kwargs=dataset_kwargs,
nrows=nrows,
skip_rows=skip_rows,
**kwargs,
)
# Apply filters row-wise (if any are defined), and return
Expand Down Expand Up @@ -813,18 +817,27 @@ def _parquet_to_frame(
partition_keys=None,
partition_categories=None,
dataset_kwargs=None,
nrows=None,
skip_rows=None,
**kwargs,
):
# If this is not a partitioned read, only need
# one call to `_read_parquet`
if not partition_keys:
return _read_parquet(
paths_or_buffers,
nrows=nrows,
skip_rows=skip_rows,
*args,
row_groups=row_groups,
**kwargs,
)

if nrows is not None or skip_rows is not None:
raise NotImplementedError(
"nrows/skip_rows is not supported when reading a partitioned parquet dataset"
)

partition_meta = None
partitioning = (dataset_kwargs or {}).get("partitioning", None)
if hasattr(partitioning, "schema"):
Expand Down Expand Up @@ -912,6 +925,8 @@ def _read_parquet(
columns=None,
row_groups=None,
use_pandas_metadata=None,
nrows=None,
skip_rows=None,
*args,
**kwargs,
):
Expand All @@ -934,13 +949,21 @@ def _read_parquet(
columns=columns,
row_groups=row_groups,
use_pandas_metadata=use_pandas_metadata,
nrows=nrows if nrows is not None else -1,
skip_rows=skip_rows if skip_rows is not None else 0,
)
else:
if nrows is None:
nrows = -1
if skip_rows is None:
skip_rows = 0
return libparquet.read_parquet(
filepaths_or_buffers,
columns=columns,
row_groups=row_groups,
use_pandas_metadata=use_pandas_metadata,
nrows=nrows,
skip_rows=skip_rows,
)
else:
if (
Expand Down
2 changes: 1 addition & 1 deletion python/cudf/cudf/pylibcudf_tests/io/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def test_read_parquet_basic(

res = plc.io.parquet.read_parquet(
plc.io.SourceInfo([source]),
num_rows=nrows,
nrows=nrows,
skip_rows=skiprows,
columns=columns,
)
Expand Down
39 changes: 39 additions & 0 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -1978,6 +1978,25 @@ def test_parquet_partitioned(tmpdir_factory, cols, filename):
assert fn == filename


@pytest.mark.parametrize("kwargs", [{"nrows": 1}, {"skip_rows": 1}])
def test_parquet_partitioned_notimplemented(tmpdir_factory, kwargs):
# Checks that write_to_dataset is wrapping to_parquet
# as expected
pdf_dir = str(tmpdir_factory.mktemp("pdf_dir"))
size = 100
pdf = pd.DataFrame(
{
"a": np.arange(0, stop=size, dtype="int64"),
"b": np.random.choice(list("abcd"), size=size),
"c": np.random.choice(np.arange(4), size=size),
}
)
pdf.to_parquet(pdf_dir, index=False, partition_cols=["b"])

with pytest.raises(NotImplementedError):
cudf.read_parquet(pdf_dir, **kwargs)


@pytest.mark.parametrize("return_meta", [True, False])
def test_parquet_writer_chunked_partitioned(tmpdir_factory, return_meta):
pdf_dir = str(tmpdir_factory.mktemp("pdf_dir"))
Expand Down Expand Up @@ -3768,6 +3787,26 @@ def test_parquet_chunked_reader(
assert_eq(expected, actual)


@pytest.mark.parametrize(
"nrows,skip_rows",
[
(0, 0),
(1000, 0),
(0, 1000),
(1000, 10000),
],
)
def test_parquet_reader_nrows_skiprows(nrows, skip_rows):
df = pd.DataFrame(
{"a": [1, 2, 3, 4] * 100000, "b": ["av", "qw", "hi", "xyz"] * 100000}
)
expected = df[skip_rows : skip_rows + nrows]
buffer = BytesIO()
df.to_parquet(buffer)
got = cudf.read_parquet(buffer, nrows=nrows, skip_rows=skip_rows)
assert_eq(expected, got)


def test_parquet_reader_pandas_compatibility():
df = pd.DataFrame(
{"a": [1, 2, 3, 4] * 10000, "b": ["av", "qw", "hi", "xyz"] * 10000}
Expand Down
10 changes: 10 additions & 0 deletions python/cudf/cudf/utils/ioutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,16 @@
in parallel (using a python thread pool). Default allocation is
{bytes_per_thread} bytes.
This parameter is functional only when `use_python_file_object=False`.
skiprows : int, default None
If not None, the number of rows to skip from the start of the file.
.. note::
This option is not supported when the low-memory mode is on.
nrows : int, default None
If not None, the total number of rows to read.
.. note:
This option is not supported when the low-memory mode is on.
Returns
-------
Expand Down
2 changes: 1 addition & 1 deletion python/cudf_polars/cudf_polars/dsl/ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame:
tbl_w_meta = plc.io.parquet.read_parquet(
plc.io.SourceInfo(self.paths),
columns=with_columns,
num_rows=nrows,
nrows=nrows,
)
df = DataFrame.from_table(
tbl_w_meta.tbl,
Expand Down

0 comments on commit 9d0c57a

Please sign in to comment.