From 164533ec9cf7f3569aa51eb2bf764df95371d8b6 Mon Sep 17 00:00:00 2001 From: Thomas Li Date: Fri, 5 Jul 2024 22:53:11 +0000 Subject: [PATCH 01/20] wip --- python/cudf/cudf/_lib/parquet.pyx | 26 ++++++++++--- .../_lib/pylibcudf/libcudf/io/parquet.pxd | 4 +- python/cudf/cudf/io/parquet.py | 24 ++++++++++++ python/cudf/cudf/tests/test_parquet.py | 23 +++++++++++ python/cudf/cudf/utils/ioutils.py | 4 ++ python/cudf_polars/cudf_polars/dsl/ir.py | 39 ++++++++++++++----- python/cudf_polars/tests/test_scan.py | 17 ++++---- 7 files changed, 114 insertions(+), 23 deletions(-) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index d1ec5be9e62..ec8c23a350d 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -23,7 +23,7 @@ from cudf._lib.utils cimport data_from_unique_ptr from cudf._lib import pylibcudf from cudf._lib.utils import _index_level_name, generate_pandas_metadata -from libc.stdint cimport uint8_t +from libc.stdint cimport int64_t, uint8_t from libcpp cimport bool from libcpp.map cimport map from libcpp.memory cimport make_unique, unique_ptr @@ -138,7 +138,9 @@ cdef pair[parquet_reader_options, bool] _setup_parquet_reader_options( vector[vector[size_type]] row_groups, bool use_pandas_metadata, Expression filters, - object columns): + object columns, + size_type num_rows, + int64_t skip_rows): cdef parquet_reader_options args cdef parquet_reader_options_builder builder @@ -164,6 +166,12 @@ cdef pair[parquet_reader_options, bool] _setup_parquet_reader_options( for col in columns: cpp_columns.push_back(str(col).encode()) args.set_columns(cpp_columns) + + if num_rows != -1: + args.set_num_rows(num_rows) + if skip_rows != 0: + args.set_skip_rows(skip_rows) + allow_range_index &= filters is None return pair[parquet_reader_options, bool](args, allow_range_index) @@ -299,7 +307,9 @@ cdef object _process_metadata(object df, cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, use_pandas_metadata=True, - Expression filters=None): + Expression filters=None, + size_type nrows=-1, + int64_t skip_rows=0): """ Cython function to call into libcudf API, see `read_parquet`. @@ -332,7 +342,8 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, # Setup parquet reader arguments cdef parquet_reader_options args cdef pair[parquet_reader_options, bool] c_res = _setup_parquet_reader_options( - source, cpp_row_groups, use_pandas_metadata, filters, columns) + source, cpp_row_groups, use_pandas_metadata, filters, columns, + nrows, skip_rows) args, allow_range_index = c_res.first, c_res.second # Read Parquet @@ -818,7 +829,9 @@ cdef class ParquetReader: def __cinit__(self, filepaths_or_buffers, columns=None, row_groups=None, use_pandas_metadata=True, size_t chunk_read_limit=0, - size_t pass_read_limit=1024000000): + size_t pass_read_limit=1024000000, + size_type num_rows=-1, + int64_t skip_rows=0): # Convert NativeFile buffers to NativeFileDatasource, # but save original buffers in case we need to use @@ -841,7 +854,8 @@ cdef class ParquetReader: cpp_row_groups = row_groups cdef parquet_reader_options args cdef pair[parquet_reader_options, bool] c_res = _setup_parquet_reader_options( - source, cpp_row_groups, use_pandas_metadata, None, columns) + source, cpp_row_groups, use_pandas_metadata, + None, columns, num_rows, skip_rows) args, self.allow_range_index = c_res.first, c_res.second with nogil: diff --git a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd index 0ef6553db56..e6c354e0950 100644 --- a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd @@ -1,6 +1,6 @@ # Copyright (c) 2020-2024, NVIDIA CORPORATION. -from libc.stdint cimport uint8_t +from libc.stdint cimport int64_t, uint8_t from libcpp cimport bool from libcpp.functional cimport reference_wrapper from libcpp.map cimport map @@ -28,7 +28,9 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: # setter void set_columns(vector[string] col_names) except + + void set_num_rows(size_type val) except + void set_row_groups(vector[vector[size_type]] row_grp) except + + void set_skip_rows(int64_t val) except + void enable_use_arrow_schema(bool val) except + void enable_use_pandas_metadata(bool val) except + void set_timestamp_type(data_type type) except + diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 7733e770d99..449b3b7605f 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -529,6 +529,8 @@ def read_parquet( open_file_options=None, bytes_per_thread=None, dataset_kwargs=None, + nrows=None, + skip_rows=None, *args, **kwargs, ): @@ -664,6 +666,8 @@ def read_parquet( partition_keys=partition_keys, partition_categories=partition_categories, dataset_kwargs=dataset_kwargs, + nrows=nrows, + skip_rows=skip_rows, **kwargs, ) @@ -793,6 +797,8 @@ def _parquet_to_frame( partition_keys=None, partition_categories=None, dataset_kwargs=None, + nrows=None, + skip_rows=None, **kwargs, ): # If this is not a partitioned read, only need @@ -800,6 +806,8 @@ def _parquet_to_frame( if not partition_keys: return _read_parquet( paths_or_buffers, + nrows=nrows, + skip_rows=skip_rows, *args, row_groups=row_groups, **kwargs, @@ -834,6 +842,12 @@ def _parquet_to_frame( key_paths, *args, row_groups=key_row_groups, + # TODO: is this still right? + # Also, do we still care? + # partition_keys uses pyarrow dataset + # (which we can't use anymore after pyarrow is gone) + nrows=nrows, + skip_rows=skip_rows, **kwargs, ) ) @@ -892,9 +906,15 @@ def _read_parquet( columns=None, row_groups=None, use_pandas_metadata=None, + nrows=None, + skip_rows=None, *args, **kwargs, ): + if nrows is None: + nrows = -1 + if skip_rows is None: + skip_rows = 0 # Simple helper function to dispatch between # cudf and pyarrow to read parquet data if engine == "cudf": @@ -914,6 +934,8 @@ def _read_parquet( columns=columns, row_groups=row_groups, use_pandas_metadata=use_pandas_metadata, + nrows=nrows, + skip_rows=skip_rows, ).read() else: return libparquet.read_parquet( @@ -921,6 +943,8 @@ def _read_parquet( columns=columns, row_groups=row_groups, use_pandas_metadata=use_pandas_metadata, + nrows=nrows, + skip_rows=skip_rows, ) else: if ( diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 588bc87d268..9c5d6a51a0a 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -3496,3 +3496,26 @@ def test_parquet_reader_pandas_compatibility(): with cudf.option_context("mode.pandas_compatible", True): expected = cudf.read_parquet(buffer) assert_eq(expected, df) + + +@pytest.mark.parametrize("row_group_size", [1, 4, 33]) +def test_parquet_read_rows(tmpdir, pdf, row_group_size): + if len(pdf) > 100: + pytest.skip("Skipping long setup test") + + fname = tmpdir.join("row_group.parquet") + pdf.to_parquet(fname, compression="None", row_group_size=row_group_size) + + total_rows, _, _, _, _ = cudf.io.read_parquet_metadata(fname) + + num_rows = total_rows // 4 + skip_rows = (total_rows - num_rows) // 2 + gdf = cudf.read_parquet(fname, skip_rows=skip_rows, num_rows=num_rows) + + # cudf doesn't preserve category dtype + if "col_category" in pdf.columns: + pdf = pdf.drop(columns=["col_category"]) + if "col_category" in gdf.columns: + gdf = gdf.drop(columns=["col_category"]) + + assert_eq(pdf.iloc[skip_rows : skip_rows + num_rows], gdf) diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 0209c692935..1d8ac556df5 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -190,6 +190,10 @@ in parallel (using a python thread pool). Default allocation is {bytes_per_thread} bytes. This parameter is functional only when `use_python_file_object=False`. +skiprows : int, default None + If not None, the number of rows to skip from the start of the file. +nrows : int, default None + If not None, the total number of rows to read. Returns ------- diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index 6b552642e88..ddfd10d56f2 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -203,8 +203,9 @@ class Scan(IR): def __post_init__(self) -> None: """Validate preconditions.""" - if self.file_options.n_rows is not None: - raise NotImplementedError("row limit in scan") + self.opts, self.cloud_opts = map(json.loads, self.options) + if self.opts["skip_rows_after_header"] > 0: + raise NotImplementedError("skip_rows_after_header=False") if self.typ not in ("csv", "parquet"): raise NotImplementedError( f"Unhandled scan type: {self.typ}" @@ -216,15 +217,35 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: with_columns = options.with_columns row_index = options.row_index if self.typ == "csv": - opts, cloud_opts = map(json.loads, self.options) - df = DataFrame.from_cudf( - cudf.concat( - [cudf.read_csv(p, usecols=with_columns) for p in self.paths] - ) + # TODO: verify multi-file nrow behavior + skip_rows = self.opts["skip_rows"] + header = 0 if self.opts["has_header"] else None + gdf = cudf.concat( + [ + cudf.read_csv( + p, + header=header, + skiprows=skip_rows, + nrows=self.file_options.n_rows, + usecols=with_columns, + ) + for p in self.paths + ] ) + if not self.opts["has_header"]: + # Rename columns to use column_x format (where x starts from 1) + # if there is no header following polars + print(list(gdf.columns)) + gdf = gdf.rename( + columns={ + col: f"column_{i + 1}" for i, col in enumerate(gdf.columns) + } + ) + df = DataFrame.from_cudf(gdf) elif self.typ == "parquet": - opts, cloud_opts = map(json.loads, self.options) - cdf = cudf.read_parquet(self.paths, columns=with_columns) + cdf = cudf.read_parquet( + self.paths, nrows=self.file_options.n_rows, columns=with_columns + ) assert isinstance(cdf, cudf.DataFrame) df = DataFrame.from_cudf(cdf) else: diff --git a/python/cudf_polars/tests/test_scan.py b/python/cudf_polars/tests/test_scan.py index f129cc7ca32..1e0853eda19 100644 --- a/python/cudf_polars/tests/test_scan.py +++ b/python/cudf_polars/tests/test_scan.py @@ -23,12 +23,14 @@ def row_index(request): @pytest.fixture( params=[ (None, 0), - pytest.param( - (2, 1), marks=pytest.mark.xfail(reason="No handling of row limit in scan") - ), - pytest.param( - (3, 0), marks=pytest.mark.xfail(reason="No handling of row limit in scan") - ), + (2, 1), + (3, 0), + # pytest.param( + # (2, 1), marks=pytest.mark.xfail(reason="No handling of row limit in scan") + # ), + # pytest.param( + # (3, 0), marks=pytest.mark.xfail(reason="No handling of row limit in scan") + # ), ], ids=["all-rows", "n_rows-with-skip", "n_rows-no-skip"], ) @@ -53,7 +55,8 @@ def df(request, tmp_path, row_index, n_rows_skip_rows): tmp_path / "file.csv", row_index_name=name, row_index_offset=offset, - skip_rows_after_header=skip_rows, + has_header=False, + skip_rows=skip_rows, n_rows=n_rows, ) else: From 3b0427a7e17199443772e04ba471026bd5159d15 Mon Sep 17 00:00:00 2001 From: Thomas Li Date: Mon, 8 Jul 2024 14:08:37 +0000 Subject: [PATCH 02/20] revert polars changes --- python/cudf_polars/cudf_polars/dsl/ir.py | 39 ++++++------------------ python/cudf_polars/tests/test_scan.py | 17 +++++------ 2 files changed, 16 insertions(+), 40 deletions(-) diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index ddfd10d56f2..6b552642e88 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -203,9 +203,8 @@ class Scan(IR): def __post_init__(self) -> None: """Validate preconditions.""" - self.opts, self.cloud_opts = map(json.loads, self.options) - if self.opts["skip_rows_after_header"] > 0: - raise NotImplementedError("skip_rows_after_header=False") + if self.file_options.n_rows is not None: + raise NotImplementedError("row limit in scan") if self.typ not in ("csv", "parquet"): raise NotImplementedError( f"Unhandled scan type: {self.typ}" @@ -217,35 +216,15 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: with_columns = options.with_columns row_index = options.row_index if self.typ == "csv": - # TODO: verify multi-file nrow behavior - skip_rows = self.opts["skip_rows"] - header = 0 if self.opts["has_header"] else None - gdf = cudf.concat( - [ - cudf.read_csv( - p, - header=header, - skiprows=skip_rows, - nrows=self.file_options.n_rows, - usecols=with_columns, - ) - for p in self.paths - ] - ) - if not self.opts["has_header"]: - # Rename columns to use column_x format (where x starts from 1) - # if there is no header following polars - print(list(gdf.columns)) - gdf = gdf.rename( - columns={ - col: f"column_{i + 1}" for i, col in enumerate(gdf.columns) - } + opts, cloud_opts = map(json.loads, self.options) + df = DataFrame.from_cudf( + cudf.concat( + [cudf.read_csv(p, usecols=with_columns) for p in self.paths] ) - df = DataFrame.from_cudf(gdf) - elif self.typ == "parquet": - cdf = cudf.read_parquet( - self.paths, nrows=self.file_options.n_rows, columns=with_columns ) + elif self.typ == "parquet": + opts, cloud_opts = map(json.loads, self.options) + cdf = cudf.read_parquet(self.paths, columns=with_columns) assert isinstance(cdf, cudf.DataFrame) df = DataFrame.from_cudf(cdf) else: diff --git a/python/cudf_polars/tests/test_scan.py b/python/cudf_polars/tests/test_scan.py index 1e0853eda19..f129cc7ca32 100644 --- a/python/cudf_polars/tests/test_scan.py +++ b/python/cudf_polars/tests/test_scan.py @@ -23,14 +23,12 @@ def row_index(request): @pytest.fixture( params=[ (None, 0), - (2, 1), - (3, 0), - # pytest.param( - # (2, 1), marks=pytest.mark.xfail(reason="No handling of row limit in scan") - # ), - # pytest.param( - # (3, 0), marks=pytest.mark.xfail(reason="No handling of row limit in scan") - # ), + pytest.param( + (2, 1), marks=pytest.mark.xfail(reason="No handling of row limit in scan") + ), + pytest.param( + (3, 0), marks=pytest.mark.xfail(reason="No handling of row limit in scan") + ), ], ids=["all-rows", "n_rows-with-skip", "n_rows-no-skip"], ) @@ -55,8 +53,7 @@ def df(request, tmp_path, row_index, n_rows_skip_rows): tmp_path / "file.csv", row_index_name=name, row_index_offset=offset, - has_header=False, - skip_rows=skip_rows, + skip_rows_after_header=skip_rows, n_rows=n_rows, ) else: From 03c18897de864bf92d45c14ca790bfbdfd5cd173 Mon Sep 17 00:00:00 2001 From: Thomas Li Date: Mon, 8 Jul 2024 14:23:21 +0000 Subject: [PATCH 03/20] fixes --- python/cudf/cudf/_lib/parquet.pyx | 4 ++-- python/cudf/cudf/tests/test_parquet.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index ec8c23a350d..0afa0b1c0df 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -830,7 +830,7 @@ cdef class ParquetReader: use_pandas_metadata=True, size_t chunk_read_limit=0, size_t pass_read_limit=1024000000, - size_type num_rows=-1, + size_type nrows=-1, int64_t skip_rows=0): # Convert NativeFile buffers to NativeFileDatasource, @@ -855,7 +855,7 @@ cdef class ParquetReader: cdef parquet_reader_options args cdef pair[parquet_reader_options, bool] c_res = _setup_parquet_reader_options( source, cpp_row_groups, use_pandas_metadata, - None, columns, num_rows, skip_rows) + None, columns, nrows, skip_rows) args, self.allow_range_index = c_res.first, c_res.second with nogil: diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 9c5d6a51a0a..fc5b895d3bf 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -3510,7 +3510,7 @@ def test_parquet_read_rows(tmpdir, pdf, row_group_size): num_rows = total_rows // 4 skip_rows = (total_rows - num_rows) // 2 - gdf = cudf.read_parquet(fname, skip_rows=skip_rows, num_rows=num_rows) + gdf = cudf.read_parquet(fname, skip_rows=skip_rows, nrows=num_rows) # cudf doesn't preserve category dtype if "col_category" in pdf.columns: From bf5e902f4c6976efea93f89d472688b4218f71ce Mon Sep 17 00:00:00 2001 From: Thomas Li Date: Mon, 8 Jul 2024 17:21:27 +0000 Subject: [PATCH 04/20] rollback changes to chunked parquet reader --- python/cudf/cudf/_lib/parquet.pyx | 10 ++++------ python/cudf/cudf/io/parquet.py | 6 ++++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 0afa0b1c0df..0e9f9558110 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -139,8 +139,8 @@ cdef pair[parquet_reader_options, bool] _setup_parquet_reader_options( bool use_pandas_metadata, Expression filters, object columns, - size_type num_rows, - int64_t skip_rows): + size_type num_rows=-1, + int64_t skip_rows=0): cdef parquet_reader_options args cdef parquet_reader_options_builder builder @@ -829,9 +829,7 @@ cdef class ParquetReader: def __cinit__(self, filepaths_or_buffers, columns=None, row_groups=None, use_pandas_metadata=True, size_t chunk_read_limit=0, - size_t pass_read_limit=1024000000, - size_type nrows=-1, - int64_t skip_rows=0): + size_t pass_read_limit=1024000000): # Convert NativeFile buffers to NativeFileDatasource, # but save original buffers in case we need to use @@ -855,7 +853,7 @@ cdef class ParquetReader: cdef parquet_reader_options args cdef pair[parquet_reader_options, bool] c_res = _setup_parquet_reader_options( source, cpp_row_groups, use_pandas_metadata, - None, columns, nrows, skip_rows) + None, columns) args, self.allow_range_index = c_res.first, c_res.second with nogil: diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 449b3b7605f..179d9da38dd 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -929,13 +929,15 @@ def _read_parquet( f"following positional arguments: {list(args)}" ) if cudf.get_option("mode.pandas_compatible"): + # TODO: consider plumbing nrows/skiprows through to parquet reader + # (It's not super important now since pandas doesn't support it ATM, + # but may be relevant in the future) + # xref https://github.com/pandas-dev/pandas/issues/51830 return libparquet.ParquetReader( filepaths_or_buffers, columns=columns, row_groups=row_groups, use_pandas_metadata=use_pandas_metadata, - nrows=nrows, - skip_rows=skip_rows, ).read() else: return libparquet.read_parquet( From 56c88ed3f3f2f4da616f18cd15ff0f6673b44c19 Mon Sep 17 00:00:00 2001 From: Thomas Li Date: Mon, 8 Jul 2024 17:22:31 +0000 Subject: [PATCH 05/20] revert changes to parquetreader --- python/cudf/cudf/_lib/parquet.pyx | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 0e9f9558110..83dfaf2321f 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -852,8 +852,7 @@ cdef class ParquetReader: cpp_row_groups = row_groups cdef parquet_reader_options args cdef pair[parquet_reader_options, bool] c_res = _setup_parquet_reader_options( - source, cpp_row_groups, use_pandas_metadata, - None, columns) + source, cpp_row_groups, use_pandas_metadata, None, columns) args, self.allow_range_index = c_res.first, c_res.second with nogil: From f52b6066379140c8e5bc5ebb22c59e187696001a Mon Sep 17 00:00:00 2001 From: Thomas Li Date: Mon, 8 Jul 2024 17:34:07 +0000 Subject: [PATCH 06/20] raise notimplemented for chunked parquet reader nrows/skiprows --- python/cudf/cudf/io/parquet.py | 16 ++++++++++++---- python/cudf/cudf/tests/test_parquet.py | 16 ++++++++++++++++ python/cudf/cudf/utils/ioutils.py | 6 ++++++ 3 files changed, 34 insertions(+), 4 deletions(-) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 179d9da38dd..8c8b66dba0b 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -911,10 +911,6 @@ def _read_parquet( *args, **kwargs, ): - if nrows is None: - nrows = -1 - if skip_rows is None: - skip_rows = 0 # Simple helper function to dispatch between # cudf and pyarrow to read parquet data if engine == "cudf": @@ -933,6 +929,14 @@ def _read_parquet( # (It's not super important now since pandas doesn't support it ATM, # but may be relevant in the future) # xref https://github.com/pandas-dev/pandas/issues/51830 + if nrows is not None: + raise NotImplementedError( + "pandas compatibility mode doesn't support nrows in read_parquet" + ) + if skip_rows is not None: + raise NotImplementedError( + "pandas compatibility mode doesn't support skip_rows in read_parquet" + ) return libparquet.ParquetReader( filepaths_or_buffers, columns=columns, @@ -940,6 +944,10 @@ def _read_parquet( use_pandas_metadata=use_pandas_metadata, ).read() else: + if nrows is None: + nrows = -1 + if skip_rows is None: + skip_rows = 0 return libparquet.read_parquet( filepaths_or_buffers, columns=columns, diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index fc5b895d3bf..c50d5a93442 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -3498,6 +3498,22 @@ def test_parquet_reader_pandas_compatibility(): assert_eq(expected, df) +@pytest.mark.parametrize( + "kwargs", + [ + {"skip_rows": 0}, + {"nrows": 1}, + ], +) +def test_parquet_reader_pandas_compatibility_unsupported(kwargs): + df = pd.DataFrame({"a": [1, 2, 3, 4], "b": ["av", "qw", "hi", "xyz"]}) + buffer = BytesIO() + df.to_parquet(buffer) + with cudf.option_context("mode.pandas_compatible", True): + with pytest.raises(NotImplementedError): + cudf.read_parquet(buffer, **kwargs) + + @pytest.mark.parametrize("row_group_size", [1, 4, 33]) def test_parquet_read_rows(tmpdir, pdf, row_group_size): if len(pdf) > 100: diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 1d8ac556df5..788defa014c 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -192,9 +192,15 @@ This parameter is functional only when `use_python_file_object=False`. skiprows : int, default None If not None, the number of rows to skip from the start of the file. + + .. pandas-compat:: + This option is not supported when pandas compatibility mode is on. nrows : int, default None If not None, the total number of rows to read. + .. pandas-compat:: + This option is not supported when pandas compatibility mode is on. + Returns ------- DataFrame From 3eeb95a28d5e6374c3a505ce193a3c03a3fdd7dd Mon Sep 17 00:00:00 2001 From: Thomas Li Date: Mon, 8 Jul 2024 21:37:09 +0000 Subject: [PATCH 07/20] fix docs --- python/cudf/cudf/utils/ioutils.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 788defa014c..cbe3da261a3 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -193,13 +193,13 @@ skiprows : int, default None If not None, the number of rows to skip from the start of the file. - .. pandas-compat:: - This option is not supported when pandas compatibility mode is on. + .. note:: + This option is not supported when pandas compatibility mode is on. nrows : int, default None If not None, the total number of rows to read. - .. pandas-compat:: - This option is not supported when pandas compatibility mode is on. + .. note: + This option is not supported when pandas compatibility mode is on. Returns ------- From cc3773746cae461c6164e5d2314f65c17aaac48b Mon Sep 17 00:00:00 2001 From: Thomas Li Date: Tue, 9 Jul 2024 21:44:30 +0000 Subject: [PATCH 08/20] notimplemented for partitioned as well --- python/cudf/cudf/io/parquet.py | 11 +++++------ python/cudf/cudf/tests/test_parquet.py | 20 ++++++++++++++++++++ 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 8c8b66dba0b..f205a615a80 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -813,6 +813,11 @@ def _parquet_to_frame( **kwargs, ) + if nrows is not None or skip_rows is not None: + raise NotImplementedError( + "nrows/skip_rows is not supported when reading a partitioned parquet dataset" + ) + partition_meta = None partitioning = (dataset_kwargs or {}).get("partitioning", None) if hasattr(partitioning, "schema"): @@ -842,12 +847,6 @@ def _parquet_to_frame( key_paths, *args, row_groups=key_row_groups, - # TODO: is this still right? - # Also, do we still care? - # partition_keys uses pyarrow dataset - # (which we can't use anymore after pyarrow is gone) - nrows=nrows, - skip_rows=skip_rows, **kwargs, ) ) diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index c50d5a93442..18d19e7050f 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -1970,6 +1970,26 @@ def test_parquet_partitioned(tmpdir_factory, cols, filename): assert fn == filename +@pytest.mark.parametrize("kwargs", [{"nrows": 1}, {"skip_rows": 1}]) +def test_parquet_partitioned_notimplemented(tmpdir_factory, kwargs): + # Checks that write_to_dataset is wrapping to_parquet + # as expected + pdf_dir = str(tmpdir_factory.mktemp("pdf_dir")) + size = 100 + pdf = pd.DataFrame( + { + "a": np.arange(0, stop=size, dtype="int64"), + "b": np.random.choice(list("abcd"), size=size), + "c": np.random.choice(np.arange(4), size=size), + } + ) + pdf.to_parquet(pdf_dir, index=False, partition_cols=["b"]) + + # Check that cudf and pd return the same read + with pytest.raises(NotImplementedError): + cudf.read_parquet(pdf_dir, **kwargs) + + @pytest.mark.parametrize("return_meta", [True, False]) def test_parquet_writer_chunked_partitioned(tmpdir_factory, return_meta): pdf_dir = str(tmpdir_factory.mktemp("pdf_dir")) From 9c6a5da061144023f444ac81d3e4dd77d11a93a9 Mon Sep 17 00:00:00 2001 From: Thomas Li Date: Fri, 12 Jul 2024 17:29:14 +0000 Subject: [PATCH 09/20] buggy chunked parquet reader nrows/skiprows --- python/cudf/cudf/_lib/parquet.pyx | 7 ++++-- python/cudf/cudf/io/parquet.py | 18 +++++++------- python/cudf/cudf/tests/test_parquet.py | 33 ++++++++++++++++++++++++++ 3 files changed, 48 insertions(+), 10 deletions(-) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 9cf1b1d569e..cd2731bca3f 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -838,7 +838,9 @@ cdef class ParquetReader: def __cinit__(self, filepaths_or_buffers, columns=None, row_groups=None, use_pandas_metadata=True, size_t chunk_read_limit=0, - size_t pass_read_limit=1024000000): + size_t pass_read_limit=1024000000, + size_type nrows=-1, + int64_t skip_rows=0): # Convert NativeFile buffers to NativeFileDatasource, # but save original buffers in case we need to use @@ -861,7 +863,8 @@ cdef class ParquetReader: cpp_row_groups = row_groups cdef parquet_reader_options args cdef pair[parquet_reader_options, bool] c_res = _setup_parquet_reader_options( - source, cpp_row_groups, use_pandas_metadata, None, columns) + source, cpp_row_groups, use_pandas_metadata, None, columns, + nrows, skip_rows) args, self.allow_range_index = c_res.first, c_res.second with nogil: diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 83ca7fa9db0..63a8e56b45d 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -936,19 +936,21 @@ def _read_parquet( # (It's not super important now since pandas doesn't support it ATM, # but may be relevant in the future) # xref https://github.com/pandas-dev/pandas/issues/51830 - if nrows is not None: - raise NotImplementedError( - "pandas compatibility mode doesn't support nrows in read_parquet" - ) - if skip_rows is not None: - raise NotImplementedError( - "pandas compatibility mode doesn't support skip_rows in read_parquet" - ) + # if nrows is not None: + # raise NotImplementedError( + # "pandas compatibility mode doesn't support nrows in read_parquet" + # ) + # if skip_rows is not None: + # raise NotImplementedError( + # "pandas compatibility mode doesn't support skip_rows in read_parquet" + # ) return libparquet.ParquetReader( filepaths_or_buffers, columns=columns, row_groups=row_groups, use_pandas_metadata=use_pandas_metadata, + nrows=nrows, + skip_rows=skip_rows, ).read() else: if nrows is None: diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 3d8242a5c4b..dce2f2d4cee 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -3786,6 +3786,39 @@ def test_parquet_chunked_reader( assert_eq(expected, actual) +@pytest.mark.parametrize("chunk_read_limit", [240]) # [0, 240, 102400000]) +@pytest.mark.parametrize("pass_read_limit", [240]) # [0, 240, 102400000]) +@pytest.mark.parametrize( + "nrows,skip_rows", + [ + # These are OK + # (0, 0), + # (1000, 0), + # This one is not OK + (1000, 10000), + ], +) +def test_parquet_chunked_reader_nrows_skiprows( + chunk_read_limit, pass_read_limit, nrows, skip_rows +): + df = pd.DataFrame( + {"a": [1, 2, 3, 4] * 100000, "b": ["av", "qw", "hi", "xyz"] * 100000} + ) + buffer = BytesIO() + df.to_parquet(buffer) + reader = ParquetReader( + [buffer], + chunk_read_limit=chunk_read_limit, + pass_read_limit=pass_read_limit, + nrows=nrows, + skip_rows=skip_rows, + ) + buffer.seek(0) + expected = cudf.read_parquet(buffer, nrows=nrows, skip_rows=skip_rows) + actual = reader.read() + assert_eq(expected, actual) + + def test_parquet_reader_pandas_compatibility(): df = pd.DataFrame( {"a": [1, 2, 3, 4] * 10000, "b": ["av", "qw", "hi", "xyz"] * 10000} From 4f929e5c48dd439fcc42d86a95b2c012f783a54e Mon Sep 17 00:00:00 2001 From: Thomas Li Date: Mon, 22 Jul 2024 18:07:47 +0000 Subject: [PATCH 10/20] fix some tests --- python/cudf/cudf/io/parquet.py | 4 ++-- python/cudf/cudf/tests/test_parquet.py | 16 ---------------- 2 files changed, 2 insertions(+), 18 deletions(-) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 3c42f046b74..4a419a2fbb6 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -949,8 +949,8 @@ def _read_parquet( columns=columns, row_groups=row_groups, use_pandas_metadata=use_pandas_metadata, - nrows=nrows, - skip_rows=skip_rows, + nrows=nrows if nrows is not None else -1, + skip_rows=skip_rows if skip_rows is not None else 0, ) else: if nrows is None: diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index e67e29f8c38..631ae3476d5 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -3832,22 +3832,6 @@ def test_parquet_reader_pandas_compatibility(): assert_eq(expected, df) -@pytest.mark.parametrize( - "kwargs", - [ - {"skip_rows": 0}, - {"nrows": 1}, - ], -) -def test_parquet_reader_pandas_compatibility_unsupported(kwargs): - df = pd.DataFrame({"a": [1, 2, 3, 4], "b": ["av", "qw", "hi", "xyz"]}) - buffer = BytesIO() - df.to_parquet(buffer) - with cudf.option_context("mode.pandas_compatible", True): - with pytest.raises(NotImplementedError): - cudf.read_parquet(buffer, **kwargs) - - @pytest.mark.parametrize("row_group_size", [1, 4, 33]) def test_parquet_read_rows(tmpdir, pdf, row_group_size): if len(pdf) > 100: From 3bb52c1a61a34b023209c9bfd2417d3d19618cb8 Mon Sep 17 00:00:00 2001 From: Thomas Li Date: Mon, 22 Jul 2024 18:34:18 +0000 Subject: [PATCH 11/20] more data --- python/cudf/cudf/tests/test_parquet.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 631ae3476d5..dc575fc1e08 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -3794,9 +3794,13 @@ def test_parquet_chunked_reader( "nrows,skip_rows", [ # These are OK - # (0, 0), - # (1000, 0), - # This one is not OK + # Note: there is a bug if we read the buffer with read_parquet_chunked + # before cudf.read_parquet, where the cudf.read_parquet call will + # produce an invalid DF + (0, 0), + (1000, 0), + (0, 1000), + # segfault in the chunked parquet reader (invalid device ordinal) (1000, 10000), ], ) @@ -3808,7 +3812,7 @@ def test_parquet_chunked_reader_nrows_skiprows( ) buffer = BytesIO() df.to_parquet(buffer) - reader = read_parquet_chunked( + actual = read_parquet_chunked( [buffer], chunk_read_limit=chunk_read_limit, pass_read_limit=pass_read_limit, @@ -3817,7 +3821,6 @@ def test_parquet_chunked_reader_nrows_skiprows( ) buffer.seek(0) expected = cudf.read_parquet(buffer, nrows=nrows, skip_rows=skip_rows) - actual = reader.read() assert_eq(expected, actual) From 7bd243834ea79d7b2e274cdd50b446474cca2780 Mon Sep 17 00:00:00 2001 From: Thomas Li Date: Tue, 23 Jul 2024 17:07:26 +0000 Subject: [PATCH 12/20] fix range index metadata processing --- python/cudf/cudf/_lib/parquet.pyx | 19 ++++++++++++++----- python/cudf/cudf/tests/test_parquet.py | 6 ++---- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 8efd09af885..be1d6d51cdf 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -132,7 +132,10 @@ cdef object _process_metadata(object df, object filepaths_or_buffers, list pa_buffers, bool allow_range_index, - bool use_pandas_metadata): + bool use_pandas_metadata, + size_type num_rows=-1, + int64_t skip_rows=0, + ): add_df_col_struct_names(df, child_names) index_col = None @@ -221,9 +224,13 @@ cdef object _process_metadata(object df, else: idx = cudf.Index(cudf.core.column.column_empty(0)) else: + start = range_index_meta["start"] + skip_rows + stop = range_index_meta["stop"] + if num_rows != -1: + stop = start + num_rows idx = cudf.RangeIndex( - start=range_index_meta['start'], - stop=range_index_meta['stop'], + start=start, + stop=stop, step=range_index_meta['step'], name=range_index_meta['name'] ) @@ -324,7 +331,8 @@ def read_parquet_chunked( df = _process_metadata(df, column_names, child_names, per_file_user_data, row_groups, filepaths_or_buffers, pa_buffers, - allow_range_index, use_pandas_metadata) + allow_range_index, use_pandas_metadata, + num_rows=nrows, skip_rows=skip_rows) return df @@ -379,7 +387,8 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, df = _process_metadata(df, tbl_w_meta.column_names(include_children=False), tbl_w_meta.child_names, tbl_w_meta.per_file_user_data, row_groups, filepaths_or_buffers, pa_buffers, - allow_range_index, use_pandas_metadata) + allow_range_index, use_pandas_metadata, + num_rows=nrows, skip_rows=skip_rows) return df cpdef read_parquet_metadata(filepaths_or_buffers): diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index dc575fc1e08..27d4486945a 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -3794,9 +3794,6 @@ def test_parquet_chunked_reader( "nrows,skip_rows", [ # These are OK - # Note: there is a bug if we read the buffer with read_parquet_chunked - # before cudf.read_parquet, where the cudf.read_parquet call will - # produce an invalid DF (0, 0), (1000, 0), (0, 1000), @@ -3835,7 +3832,7 @@ def test_parquet_reader_pandas_compatibility(): assert_eq(expected, df) -@pytest.mark.parametrize("row_group_size", [1, 4, 33]) +@pytest.mark.parametrize("row_group_size", [None, 1, 4, 33]) def test_parquet_read_rows(tmpdir, pdf, row_group_size): if len(pdf) > 100: pytest.skip("Skipping long setup test") @@ -3847,6 +3844,7 @@ def test_parquet_read_rows(tmpdir, pdf, row_group_size): num_rows = total_rows // 4 skip_rows = (total_rows - num_rows) // 2 + print(num_rows, skip_rows) gdf = cudf.read_parquet(fname, skip_rows=skip_rows, nrows=num_rows) # cudf doesn't preserve category dtype From e1982faa33b3d8f487daf1d054834d18740d0aee Mon Sep 17 00:00:00 2001 From: Thomas Li <47963215+lithomas1@users.noreply.github.com> Date: Tue, 23 Jul 2024 12:31:38 -0700 Subject: [PATCH 13/20] Update python/cudf/cudf/tests/test_parquet.py Co-authored-by: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> --- python/cudf/cudf/tests/test_parquet.py | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 27d4486945a..d48ed68fa33 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -3801,24 +3801,18 @@ def test_parquet_chunked_reader( (1000, 10000), ], ) -def test_parquet_chunked_reader_nrows_skiprows( - chunk_read_limit, pass_read_limit, nrows, skip_rows +def test_parquet_reader_nrows_skiprows( + nrows, skip_rows ): df = pd.DataFrame( {"a": [1, 2, 3, 4] * 100000, "b": ["av", "qw", "hi", "xyz"] * 100000} ) + expected = df[skip_rows:skip_rows+nrows] buffer = BytesIO() df.to_parquet(buffer) - actual = read_parquet_chunked( - [buffer], - chunk_read_limit=chunk_read_limit, - pass_read_limit=pass_read_limit, - nrows=nrows, - skip_rows=skip_rows, - ) - buffer.seek(0) - expected = cudf.read_parquet(buffer, nrows=nrows, skip_rows=skip_rows) - assert_eq(expected, actual) + got = cudf.read_parquet(buffer, nrows=nrows, skip_rows=skip_rows) + # Check results + assert_eq(expected, got) def test_parquet_reader_pandas_compatibility(): From 30faf889a64dead1a33b70cbc4f0b48aed348049 Mon Sep 17 00:00:00 2001 From: Thomas Li Date: Tue, 23 Jul 2024 19:41:53 +0000 Subject: [PATCH 14/20] update --- python/cudf/cudf/tests/test_parquet.py | 34 ++------------------------ python/cudf/cudf/utils/ioutils.py | 4 +-- 2 files changed, 4 insertions(+), 34 deletions(-) diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index d48ed68fa33..273aa6a5cd6 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -3788,26 +3788,20 @@ def test_parquet_chunked_reader( assert_eq(expected, actual) -@pytest.mark.parametrize("chunk_read_limit", [240]) # [0, 240, 102400000]) -@pytest.mark.parametrize("pass_read_limit", [240]) # [0, 240, 102400000]) @pytest.mark.parametrize( "nrows,skip_rows", [ - # These are OK (0, 0), (1000, 0), (0, 1000), - # segfault in the chunked parquet reader (invalid device ordinal) (1000, 10000), ], ) -def test_parquet_reader_nrows_skiprows( - nrows, skip_rows -): +def test_parquet_reader_nrows_skiprows(nrows, skip_rows): df = pd.DataFrame( {"a": [1, 2, 3, 4] * 100000, "b": ["av", "qw", "hi", "xyz"] * 100000} ) - expected = df[skip_rows:skip_rows+nrows] + expected = df[skip_rows : skip_rows + nrows] buffer = BytesIO() df.to_parquet(buffer) got = cudf.read_parquet(buffer, nrows=nrows, skip_rows=skip_rows) @@ -3824,27 +3818,3 @@ def test_parquet_reader_pandas_compatibility(): with cudf.option_context("io.parquet.low_memory", True): expected = cudf.read_parquet(buffer) assert_eq(expected, df) - - -@pytest.mark.parametrize("row_group_size", [None, 1, 4, 33]) -def test_parquet_read_rows(tmpdir, pdf, row_group_size): - if len(pdf) > 100: - pytest.skip("Skipping long setup test") - - fname = tmpdir.join("row_group.parquet") - pdf.to_parquet(fname, compression="None", row_group_size=row_group_size) - - total_rows, _, _, _, _ = cudf.io.read_parquet_metadata(fname) - - num_rows = total_rows // 4 - skip_rows = (total_rows - num_rows) // 2 - print(num_rows, skip_rows) - gdf = cudf.read_parquet(fname, skip_rows=skip_rows, nrows=num_rows) - - # cudf doesn't preserve category dtype - if "col_category" in pdf.columns: - pdf = pdf.drop(columns=["col_category"]) - if "col_category" in gdf.columns: - gdf = gdf.drop(columns=["col_category"]) - - assert_eq(pdf.iloc[skip_rows : skip_rows + num_rows], gdf) diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 4ed055e59b9..448a815fe1b 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -203,12 +203,12 @@ If not None, the number of rows to skip from the start of the file. .. note:: - This option is not supported when pandas compatibility mode is on. + This option is not supported when the low-memory mode is on. nrows : int, default None If not None, the total number of rows to read. .. note: - This option is not supported when pandas compatibility mode is on. + This option is not supported when the low-memory mode is on. Returns ------- From d1401841629e38e84c1a7a27b6881740165ee37c Mon Sep 17 00:00:00 2001 From: Thomas Li Date: Mon, 29 Jul 2024 18:47:17 +0000 Subject: [PATCH 15/20] rename params --- python/cudf/cudf/_lib/parquet.pyx | 10 +++++----- python/cudf/cudf/_lib/pylibcudf/io/parquet.pyx | 18 +++++++++--------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index be1d6d51cdf..2afee2c4c7b 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -133,7 +133,7 @@ cdef object _process_metadata(object df, list pa_buffers, bool allow_range_index, bool use_pandas_metadata, - size_type num_rows=-1, + size_type nrows=-1, int64_t skip_rows=0, ): @@ -298,7 +298,7 @@ def read_parquet_chunked( chunk_read_limit=chunk_read_limit, pass_read_limit=pass_read_limit, skip_rows=skip_rows, - num_rows=nrows, + nrows=nrows, ) tbl_w_meta = reader.read_chunk() @@ -332,7 +332,7 @@ def read_parquet_chunked( per_file_user_data, row_groups, filepaths_or_buffers, pa_buffers, allow_range_index, use_pandas_metadata, - num_rows=nrows, skip_rows=skip_rows) + nrows=nrows, skip_rows=skip_rows) return df @@ -377,7 +377,7 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, convert_strings_to_categories = False, use_pandas_metadata = use_pandas_metadata, skip_rows = skip_rows, - num_rows = nrows, + nrows = nrows, ) df = cudf.DataFrame._from_data( @@ -388,7 +388,7 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, tbl_w_meta.child_names, tbl_w_meta.per_file_user_data, row_groups, filepaths_or_buffers, pa_buffers, allow_range_index, use_pandas_metadata, - num_rows=nrows, skip_rows=skip_rows) + nrows=nrows, skip_rows=skip_rows) return df cpdef read_parquet_metadata(filepaths_or_buffers): diff --git a/python/cudf/cudf/_lib/pylibcudf/io/parquet.pyx b/python/cudf/cudf/_lib/pylibcudf/io/parquet.pyx index 96119e1b714..84a79f9565f 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/parquet.pyx +++ b/python/cudf/cudf/_lib/pylibcudf/io/parquet.pyx @@ -26,7 +26,7 @@ cdef parquet_reader_options _setup_parquet_reader_options( bool convert_strings_to_categories = False, bool use_pandas_metadata = True, int64_t skip_rows = 0, - size_type num_rows = -1, + size_type nrows = -1, # ReaderColumnSchema reader_column_schema = None, # DataType timestamp_type = DataType(type_id.EMPTY) ): @@ -40,8 +40,8 @@ cdef parquet_reader_options _setup_parquet_reader_options( ) if row_groups is not None: opts.set_row_groups(row_groups) - if num_rows != -1: - opts.set_num_rows(num_rows) + if nrows != -1: + opts.set_num_rows(nrows) if skip_rows != 0: opts.set_skip_rows(skip_rows) if columns is not None: @@ -73,7 +73,7 @@ cdef class ChunkedParquetReader: Whether to convert string columns to the category type skip_rows : int64_t, default 0 The number of rows to skip from the start of the file. - num_rows : size_type, default -1 + nrows : size_type, default -1 The number of rows to read. By default, read the entire file. chunk_read_limit : size_t, default 0 Limit on total number of bytes to be returned per read, @@ -90,7 +90,7 @@ cdef class ChunkedParquetReader: bool use_pandas_metadata=True, bool convert_strings_to_categories=False, int64_t skip_rows = 0, - size_type num_rows = -1, + size_type nrows = -1, size_t chunk_read_limit=0, size_t pass_read_limit=1024000000 ): @@ -103,7 +103,7 @@ cdef class ChunkedParquetReader: convert_strings_to_categories=convert_strings_to_categories, use_pandas_metadata=use_pandas_metadata, skip_rows=skip_rows, - num_rows=num_rows, + nrows=nrows, ) with nogil: @@ -152,7 +152,7 @@ cpdef read_parquet( bool convert_strings_to_categories = False, bool use_pandas_metadata = True, int64_t skip_rows = 0, - size_type num_rows = -1, + size_type nrows = -1, # Disabled, these aren't used by cudf-python # we should only add them back in if there's user demand # ReaderColumnSchema reader_column_schema = None, @@ -178,7 +178,7 @@ cpdef read_parquet( the per-file user metadata of the ``TableWithMetadata`` skip_rows : int64_t, default 0 The number of rows to skip from the start of the file. - num_rows : size_type, default -1 + nrows : size_type, default -1 The number of rows to read. By default, read the entire file. Returns @@ -195,7 +195,7 @@ cpdef read_parquet( convert_strings_to_categories, use_pandas_metadata, skip_rows, - num_rows, + nrows, ) with nogil: From 07411c17477f912c3f41a1d5092928dfc2374b9c Mon Sep 17 00:00:00 2001 From: Thomas Li <47963215+lithomas1@users.noreply.github.com> Date: Tue, 30 Jul 2024 06:49:31 -0700 Subject: [PATCH 16/20] fix typo --- python/cudf/cudf/_lib/parquet.pyx | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 2afee2c4c7b..4a4b13b0b31 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -226,8 +226,8 @@ cdef object _process_metadata(object df, else: start = range_index_meta["start"] + skip_rows stop = range_index_meta["stop"] - if num_rows != -1: - stop = start + num_rows + if nrows != -1: + stop = start + nrows idx = cudf.RangeIndex( start=start, stop=stop, From 18082c7feaf859bf8813f48a4fa18e602d027cbe Mon Sep 17 00:00:00 2001 From: Thomas Li Date: Tue, 30 Jul 2024 16:05:39 +0000 Subject: [PATCH 17/20] another missed one --- python/cudf/cudf/_lib/pylibcudf/io/parquet.pxd | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/cudf/cudf/_lib/pylibcudf/io/parquet.pxd b/python/cudf/cudf/_lib/pylibcudf/io/parquet.pxd index 027f215fb91..93ef849b813 100644 --- a/python/cudf/cudf/_lib/pylibcudf/io/parquet.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/io/parquet.pxd @@ -28,7 +28,7 @@ cpdef read_parquet( bool convert_strings_to_categories = *, bool use_pandas_metadata = *, int64_t skip_rows = *, - size_type num_rows = *, + size_type nrows = *, # disabled see comment in parquet.pyx for more # ReaderColumnSchema reader_column_schema = *, # DataType timestamp_type = * From fec25b016c583f3ec80dd89f862dde6dba7cf787 Mon Sep 17 00:00:00 2001 From: Thomas Li <47963215+lithomas1@users.noreply.github.com> Date: Tue, 30 Jul 2024 10:57:04 -0700 Subject: [PATCH 18/20] fix pylibcudf tests --- python/cudf/cudf/pylibcudf_tests/io/test_parquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/cudf/cudf/pylibcudf_tests/io/test_parquet.py b/python/cudf/cudf/pylibcudf_tests/io/test_parquet.py index 07d2ab3d69a..dbd20cd473e 100644 --- a/python/cudf/cudf/pylibcudf_tests/io/test_parquet.py +++ b/python/cudf/cudf/pylibcudf_tests/io/test_parquet.py @@ -31,7 +31,7 @@ def test_read_parquet_basic( res = plc.io.parquet.read_parquet( plc.io.SourceInfo([source]), - num_rows=nrows, + nrows=nrows, skip_rows=skiprows, columns=columns, ) From a2fad688a488fc82ba660af22e7ae691f780664a Mon Sep 17 00:00:00 2001 From: Thomas Li Date: Thu, 1 Aug 2024 15:11:10 +0000 Subject: [PATCH 19/20] last fixes --- python/cudf/cudf/tests/test_parquet.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 273aa6a5cd6..879a2c50db7 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -1993,7 +1993,6 @@ def test_parquet_partitioned_notimplemented(tmpdir_factory, kwargs): ) pdf.to_parquet(pdf_dir, index=False, partition_cols=["b"]) - # Check that cudf and pd return the same read with pytest.raises(NotImplementedError): cudf.read_parquet(pdf_dir, **kwargs) @@ -3805,7 +3804,6 @@ def test_parquet_reader_nrows_skiprows(nrows, skip_rows): buffer = BytesIO() df.to_parquet(buffer) got = cudf.read_parquet(buffer, nrows=nrows, skip_rows=skip_rows) - # Check results assert_eq(expected, got) From 593ccd28d5d98009382b34b27f299a2db0ab5258 Mon Sep 17 00:00:00 2001 From: Thomas Li <47963215+lithomas1@users.noreply.github.com> Date: Thu, 1 Aug 2024 09:10:45 -0700 Subject: [PATCH 20/20] fix cudf-polars --- python/cudf_polars/cudf_polars/dsl/ir.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index 7f62dff4389..3754addeb11 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -321,7 +321,7 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: tbl_w_meta = plc.io.parquet.read_parquet( plc.io.SourceInfo(self.paths), columns=with_columns, - num_rows=nrows, + nrows=nrows, ) df = DataFrame.from_table( tbl_w_meta.tbl,