diff --git a/cpp/benchmarks/io/parquet/parquet_reader.cpp b/cpp/benchmarks/io/parquet/parquet_reader.cpp index c66f0af2b76..5f32ebf6672 100644 --- a/cpp/benchmarks/io/parquet/parquet_reader.cpp +++ b/cpp/benchmarks/io/parquet/parquet_reader.cpp @@ -71,7 +71,7 @@ void BM_parq_read_varying_input(benchmark::State& state) std::vector get_col_names(cudf::io::source_info const& source) { cudf_io::parquet_reader_options const read_options = - cudf_io::parquet_reader_options::builder(source).num_rows(1); + cudf_io::parquet_reader_options::builder(source); return cudf_io::read_parquet(read_options).metadata.column_names; } @@ -113,9 +113,8 @@ void BM_parq_read_varying_options(benchmark::State& state) .use_pandas_metadata(use_pandas_metadata) .timestamp_type(ts_type); - auto const num_row_groups = data_size / (128 << 20); - cudf::size_type const chunk_row_cnt = view.num_rows() / num_chunks; - auto mem_stats_logger = cudf::memory_stats_logger(); + auto const num_row_groups = data_size / (128 << 20); + auto mem_stats_logger = cudf::memory_stats_logger(); for (auto _ : state) { try_drop_l3_cache(); cuda_event_timer raii(state, true); // flush_l2_cache = true, stream = 0 @@ -133,11 +132,7 @@ void BM_parq_read_varying_options(benchmark::State& state) } read_options.set_row_groups({row_groups_to_read}); } break; - case row_selection::NROWS: - read_options.set_skip_rows(chunk * chunk_row_cnt); - read_options.set_num_rows(chunk_row_cnt); - if (is_last_chunk) read_options.set_num_rows(-1); - break; + case row_selection::NROWS: [[fallthrough]]; default: CUDF_FAIL("Unsupported row selection method"); } @@ -186,24 +181,3 @@ BENCHMARK_REGISTER_F(ParquetRead, column_selection) // row_selection::ROW_GROUPS disabled until we add an API to read metadata from a parquet file and // determine num row groups. https://github.com/rapidsai/cudf/pull/9963#issuecomment-1004832863 -BENCHMARK_DEFINE_F(ParquetRead, row_selection) -(::benchmark::State& state) { BM_parq_read_varying_options(state); } -BENCHMARK_REGISTER_F(ParquetRead, row_selection) - ->ArgsProduct({{int32_t(column_selection::ALL)}, - {int32_t(row_selection::NROWS)}, - {1, 4}, - {0b01}, // defaults - {int32_t(cudf::type_id::EMPTY)}}) - ->Unit(benchmark::kMillisecond) - ->UseManualTime(); - -BENCHMARK_DEFINE_F(ParquetRead, misc_options) -(::benchmark::State& state) { BM_parq_read_varying_options(state); } -BENCHMARK_REGISTER_F(ParquetRead, misc_options) - ->ArgsProduct({{int32_t(column_selection::ALL)}, - {int32_t(row_selection::NROWS)}, - {1}, - {0b01, 0b00, 0b11, 0b010}, - {int32_t(cudf::type_id::EMPTY), int32_t(cudf::type_id::TIMESTAMP_NANOSECONDS)}}) - ->Unit(benchmark::kMillisecond) - ->UseManualTime(); diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index 19156e01c1e..0673f73ef89 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -56,10 +56,6 @@ class parquet_reader_options { // List of individual row groups to read (ignored if empty) std::vector> _row_groups; - // Number of rows to skip from the start - size_type _skip_rows = 0; - // Number of rows to read; -1 is all - size_type _num_rows = -1; // Whether to store string data as categorical type bool _convert_strings_to_categories = false; @@ -133,20 +129,6 @@ class parquet_reader_options { return _convert_binary_to_strings; } - /** - * @brief Returns number of rows to skip from the start. - * - * @return Number of rows to skip from the start - */ - [[nodiscard]] size_type get_skip_rows() const { return _skip_rows; } - - /** - * @brief Returns number of rows to read. - * - * @return Number of rows to read - */ - [[nodiscard]] size_type get_num_rows() const { return _num_rows; } - /** * @brief Returns names of column to be read, if set. * @@ -182,10 +164,6 @@ class parquet_reader_options { */ void set_row_groups(std::vector> row_groups) { - if ((!row_groups.empty()) and ((_skip_rows != 0) or (_num_rows != -1))) { - CUDF_FAIL("row_groups can't be set along with skip_rows and num_rows"); - } - _row_groups = std::move(row_groups); } @@ -214,34 +192,6 @@ class parquet_reader_options { _convert_binary_to_strings = std::move(val); } - /** - * @brief Sets number of rows to skip. - * - * @param val Number of rows to skip from start - */ - void set_skip_rows(size_type val) - { - if ((val != 0) and (!_row_groups.empty())) { - CUDF_FAIL("skip_rows can't be set along with a non-empty row_groups"); - } - - _skip_rows = val; - } - - /** - * @brief Sets number of rows to read. - * - * @param val Number of rows to read after skip - */ - void set_num_rows(size_type val) - { - if ((val != -1) and (!_row_groups.empty())) { - CUDF_FAIL("num_rows can't be set along with a non-empty row_groups"); - } - - _num_rows = val; - } - /** * @brief Sets timestamp_type used to cast timestamp columns. * @@ -332,30 +282,6 @@ class parquet_reader_options_builder { return *this; } - /** - * @brief Sets number of rows to skip. - * - * @param val Number of rows to skip from start - * @return this for chaining - */ - parquet_reader_options_builder& skip_rows(size_type val) - { - options.set_skip_rows(val); - return *this; - } - - /** - * @brief Sets number of rows to read. - * - * @param val Number of rows to read after skip - * @return this for chaining - */ - parquet_reader_options_builder& num_rows(size_type val) - { - options.set_num_rows(val); - return *this; - } - /** * @brief timestamp_type used to cast timestamp columns. * diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index 318f7138517..3e71c3d1a07 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -54,15 +54,13 @@ struct page_state_s { const uint8_t* data_start; const uint8_t* data_end; const uint8_t* lvl_end; - const uint8_t* dict_base; // ptr to dictionary page data - int32_t dict_size; // size of dictionary data - int32_t first_row; // First row in page to output - int32_t num_rows; // Rows in page to decode (including rows to be skipped) - int32_t first_output_value; // First value in page to output - int32_t num_input_values; // total # of input/level values in the page - int32_t dtype_len; // Output data type length - int32_t dtype_len_in; // Can be larger than dtype_len if truncating 32-bit into 8-bit - int32_t dict_bits; // # of bits to store dictionary indices + const uint8_t* dict_base; // ptr to dictionary page data + int32_t dict_size; // size of dictionary data + int32_t num_rows; // Rows in page to decode + int32_t num_input_values; // total # of input/level values in the page + int32_t dtype_len; // Output data type length + int32_t dtype_len_in; // Can be larger than dtype_len if truncating 32-bit into 8-bit + int32_t dict_bits; // # of bits to store dictionary indices uint32_t dict_run; int32_t dict_val; uint32_t initial_rle_run[NUM_LEVEL_TYPES]; // [def,rep] @@ -88,7 +86,6 @@ struct page_state_s { uint32_t def[non_zero_buffer_size]; // circular buffer of definition level values const uint8_t* lvl_start[NUM_LEVEL_TYPES]; // [def,rep] int32_t lvl_count[NUM_LEVEL_TYPES]; // how many of each of the streams we've decoded - int32_t row_index_lower_bound; // lower bound of row indices we should process }; /** @@ -865,17 +862,14 @@ static __device__ void gpuOutputGeneric(volatile page_state_s* s, * @param[in, out] s The local page state to be filled in * @param[in] p The global page to be copied from * @param[in] chunks The global list of chunks - * @param[in] num_rows Maximum number of rows to read - * @param[in] min_row Crop all rows below min_row + * @param[in] num_rows Maximum number of rows to process */ static __device__ bool setupLocalPageInfo(page_state_s* const s, PageInfo const* p, device_span chunks, - size_t min_row, size_t num_rows) { - int t = threadIdx.x; - int chunk_idx; + int const t = threadIdx.x; // Fetch page info if (t == 0) s->page = *p; @@ -883,7 +877,7 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, if (s->page.flags & PAGEINFO_FLAGS_DICTIONARY) { return false; } // Fetch column chunk info - chunk_idx = s->page.chunk_idx; + int const chunk_idx = s->page.chunk_idx; if (t == 0) { s->col = chunks[chunk_idx]; } // zero nested value and valid counts @@ -904,19 +898,18 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, // our starting row (absolute index) is // col.start_row == absolute row index // page.chunk-row == relative row index within the chunk - size_t page_start_row = s->col.start_row + s->page.chunk_row; + size_t const page_start_row = s->col.start_row + s->page.chunk_row; // IMPORTANT : nested schemas can have 0 rows in a page but still have // values. The case is: // - On page N-1, the last row starts, with 2/6 values encoded // - On page N, the remaining 4/6 values are encoded, but there are no new rows. - // if (s->page.num_input_values > 0 && s->page.num_rows > 0) { if (s->page.num_input_values > 0) { - uint8_t* cur = s->page.page_data; - uint8_t* end = cur + s->page.uncompressed_page_size; + uint8_t const* cur = s->page.page_data; + uint8_t const* const end = cur + s->page.uncompressed_page_size; - uint32_t dtype_len_out = s->col.data_type >> 3; - s->ts_scale = 0; + uint32_t const dtype_len_out = s->col.data_type >> 3; + s->ts_scale = 0; // Validate data type auto const data_type = s->col.data_type & 7; switch (data_type) { @@ -965,17 +958,10 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, s->dtype_len = 8; // Convert to 64-bit timestamp } - // first row within the page to output - if (page_start_row >= min_row) { - s->first_row = 0; - } else { - s->first_row = (int32_t)min(min_row - page_start_row, (size_t)s->page.num_rows); - } // # of rows within the page to output s->num_rows = s->page.num_rows; - if ((page_start_row + s->first_row) + s->num_rows > min_row + num_rows) { - s->num_rows = - (int32_t)max((int64_t)(min_row + num_rows - (page_start_row + s->first_row)), INT64_C(0)); + if (page_start_row + s->num_rows > num_rows) { + s->num_rows = (int32_t)max((int64_t)(num_rows - page_start_row), INT64_C(0)); } // during the decoding step we need to offset the global output buffers @@ -984,25 +970,18 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, // - for flat schemas, we can do this directly by using row counts // - for nested schemas, these offsets are computed during the preprocess step if (s->col.column_data_base != nullptr) { - int max_depth = s->col.max_nesting_depth; + int const max_depth = s->col.max_nesting_depth; for (int idx = 0; idx < max_depth; idx++) { PageNestingInfo* pni = &s->page.nesting[idx]; - size_t output_offset; - // schemas without lists - if (s->col.max_level[level_type::REPETITION] == 0) { - output_offset = page_start_row >= min_row ? page_start_row - min_row : 0; - } - // for schemas with lists, we've already got the exactly value precomputed - else { - output_offset = pni->page_start_value; - } + size_t const output_offset = + s->col.max_level[level_type::REPETITION] == 0 ? page_start_row : pni->page_start_value; pni->data_out = static_cast(s->col.column_data_base[idx]); if (pni->data_out != nullptr) { // anything below max depth with a valid data pointer must be a list, so the // element size is the size of the offset type. - uint32_t len = idx < max_depth - 1 ? sizeof(cudf::size_type) : s->dtype_len; + uint32_t const len = idx < max_depth - 1 ? sizeof(cudf::size_type) : s->dtype_len; pni->data_out += (output_offset * len); } pni->valid_map = s->col.valid_map_base[idx]; @@ -1012,7 +991,6 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, } } } - s->first_output_value = 0; // Find the compressed size of repetition levels cur += InitLevelSection(s, cur, end, level_type::REPETITION); @@ -1065,53 +1043,9 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, s->dict_pos = 0; s->src_pos = 0; - // for flat hierarchies, we can't know how many leaf values to skip unless we do a full - // preprocess of the definition levels (since nulls will have no actual decodable value, there - // is no direct correlation between # of rows and # of decodable values). so we will start - // processing at the beginning of the value stream and disregard any indices that start - // before the first row. - if (s->col.max_level[level_type::REPETITION] == 0) { - s->page.skipped_values = 0; - s->page.skipped_leaf_values = 0; - s->input_value_count = 0; - s->input_row_count = 0; - - s->row_index_lower_bound = -1; - } - // for nested hierarchies, we have run a preprocess that lets us skip directly to the values - // we need to start decoding at - else { - // input_row_count translates to "how many rows we have processed so far", so since we are - // skipping directly to where we want to start decoding, set it to first_row - s->input_row_count = s->first_row; - - // return the lower bound to compare (page-relative) thread row index against. Explanation: - // In the case of nested schemas, rows can span page boundaries. That is to say, - // we can encounter the first value for row X on page M, but the last value for page M - // might not be the last value for row X. page M+1 (or further) may contain the last value. - // - // This means that the first values we encounter for a given page (M+1) may not belong to the - // row indicated by chunk_row, but to the row before it that spanned page boundaries. If that - // previous row is within the overall row bounds, include the values by allowing relative row - // index -1 - int const max_row = (min_row + num_rows) - 1; - if (min_row < page_start_row && max_row >= page_start_row - 1) { - s->row_index_lower_bound = -1; - } else { - s->row_index_lower_bound = s->first_row; - } - - // if we're in the decoding step, jump directly to the first - // value we care about - if (s->col.column_data_base != nullptr) { - s->input_value_count = s->page.skipped_values > -1 ? s->page.skipped_values : 0; - } else { - s->input_value_count = 0; - s->input_leaf_count = 0; - s->page.skipped_values = -1; - s->page.skipped_leaf_values = -1; - } - } + s->input_value_count = 0; + s->input_row_count = 0; + s->input_leaf_count = 0; __threadfence_block(); } @@ -1257,10 +1191,7 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu input_row_count + ((__popc(warp_row_count_mask & ((1 << t) - 1)) + is_new_row) - 1); input_row_count += __popc(warp_row_count_mask); // is this thread within read row bounds? - int const in_row_bounds = thread_row_index >= s->row_index_lower_bound && - thread_row_index < (s->first_row + s->num_rows) - ? 1 - : 0; + int const in_row_bounds = thread_row_index < s->num_rows; // compute warp and thread value counts uint32_t const warp_count_mask = @@ -1335,9 +1266,7 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu // the correct position to start reading. since we are about to write the validity vector here // we need to adjust our computed mask to take into account the write row bounds. int const in_write_row_bounds = - max_depth == 1 - ? thread_row_index >= s->first_row && thread_row_index < (s->first_row + s->num_rows) - : in_row_bounds; + max_depth == 1 ? thread_row_index < s->num_rows : in_row_bounds; int const first_thread_in_write_range = max_depth == 1 ? __ffs(ballot(in_write_row_bounds)) - 1 : 0; // # of bits to of the validity mask to write out @@ -1425,16 +1354,11 @@ __device__ void gpuDecodeLevels(page_state_s* s, int32_t target_leaf_count, int * @param[in] s The local page info * @param[in] target_input_value_count The # of repetition/definition levels to process up to * @param[in] t Thread index - * @param[in] bounds_set Whether or not s->row_index_lower_bound, s->first_row and s->num_rows - * have been computed for this page (they will only be set in the second/trim pass). */ -static __device__ void gpuUpdatePageSizes(page_state_s* s, - int32_t target_input_value_count, - int t, - bool bounds_set) +static __device__ void gpuUpdatePageSizes(page_state_s* s, int32_t target_input_value_count, int t) { // max nesting depth of the column - int max_depth = s->col.max_nesting_depth; + int const max_depth = s->col.max_nesting_depth; // bool has_repetition = s->col.max_level[level_type::REPETITION] > 0 ? true : false; // how many input level values we've processed in the page so far int input_value_count = s->input_value_count; @@ -1449,44 +1373,23 @@ static __device__ void gpuUpdatePageSizes(page_state_s* s, start_depth, end_depth, d, s, input_value_count, target_input_value_count, t); // count rows and leaf values - int is_new_row = start_depth == 0 ? 1 : 0; - uint32_t warp_row_count_mask = ballot(is_new_row); - int is_new_leaf = (d >= s->page.nesting[max_depth - 1].max_def_level) ? 1 : 0; - uint32_t warp_leaf_count_mask = ballot(is_new_leaf); - - // is this thread within row bounds? on the first pass we don't know the bounds, so we will be - // computing the full size of the column. on the second pass, we will know our actual row - // bounds, so the computation will cap sizes properly. - int in_row_bounds = 1; - if (bounds_set) { - // absolute row index - int32_t thread_row_index = - input_row_count + ((__popc(warp_row_count_mask & ((1 << t) - 1)) + is_new_row) - 1); - in_row_bounds = thread_row_index >= s->row_index_lower_bound && - thread_row_index < (s->first_row + s->num_rows) - ? 1 - : 0; - - uint32_t row_bounds_mask = ballot(in_row_bounds); - int first_thread_in_range = __ffs(row_bounds_mask) - 1; - - // if we've found the beginning of the first row, mark down the position - // in the def/repetition buffer (skipped_values) and the data buffer (skipped_leaf_values) - if (!t && first_thread_in_range >= 0 && s->page.skipped_values < 0) { - // how many values we've skipped in the rep/def levels - s->page.skipped_values = input_value_count + first_thread_in_range; - // how many values we've skipped in the actual data stream - s->page.skipped_leaf_values = - input_leaf_count + __popc(warp_leaf_count_mask & ((1 << first_thread_in_range) - 1)); - } - } + int const is_new_row = start_depth == 0 ? 1 : 0; + uint32_t const warp_row_count_mask = ballot(is_new_row); + int const is_new_leaf = (d >= s->page.nesting[max_depth - 1].max_def_level) ? 1 : 0; + uint32_t const warp_leaf_count_mask = ballot(is_new_leaf); + + // is this thread within row bounds? + int32_t const thread_row_index = + input_row_count + ((__popc(warp_row_count_mask & ((1 << t) - 1)) + is_new_row) - 1); + int const in_row_bounds = thread_row_index < s->num_rows; // increment counts across all nesting depths for (int s_idx = 0; s_idx < max_depth; s_idx++) { // if we are within the range of nesting levels we should be adding value indices for - int in_nesting_bounds = (s_idx >= start_depth && s_idx <= end_depth && in_row_bounds) ? 1 : 0; + int const in_nesting_bounds = + (s_idx >= start_depth && s_idx <= end_depth && in_row_bounds) ? 1 : 0; - uint32_t count_mask = ballot(in_nesting_bounds); + uint32_t const count_mask = ballot(in_nesting_bounds); if (!t) { s->page.nesting[s_idx].size += __popc(count_mask); } } @@ -1510,29 +1413,18 @@ static __device__ void gpuUpdatePageSizes(page_state_s* s, * * @param pages List of pages * @param chunks List of column chunks - * @param min_row Row index to start reading at - * @param num_rows Maximum number of rows to read. Pass as INT_MAX to guarantee reading all rows. - * @param trim_pass Whether or not this is the trim pass. We first have to compute - * the full size information of every page before we come through in a second (trim) pass - * to determine what subset of rows in this page we should be reading. */ __global__ void __launch_bounds__(block_size) - gpuComputePageSizes(PageInfo* pages, - device_span chunks, - size_t min_row, - size_t num_rows, - bool trim_pass) + gpuComputePageSizes(PageInfo* pages, device_span chunks) { __shared__ __align__(16) page_state_s state_g; page_state_s* const s = &state_g; - int page_idx = blockIdx.x; - int t = threadIdx.x; - PageInfo* pp = &pages[page_idx]; + int const page_idx = blockIdx.x; + int const t = threadIdx.x; + PageInfo* const pp = &pages[page_idx]; - if (!setupLocalPageInfo(s, pp, chunks, trim_pass ? min_row : 0, trim_pass ? num_rows : INT_MAX)) { - return; - } + if (!setupLocalPageInfo(s, pp, chunks, INT_MAX)) { return; } // zero sizes int d = 0; @@ -1541,21 +1433,12 @@ __global__ void __launch_bounds__(block_size) d += blockDim.x; } if (!t) { - s->page.skipped_values = -1; - s->page.skipped_leaf_values = -1; - s->input_row_count = 0; - s->input_value_count = 0; - - // if this isn't the trim pass, make sure we visit absolutely everything - if (!trim_pass) { - s->first_row = 0; - s->num_rows = INT_MAX; - s->row_index_lower_bound = -1; - } + s->input_row_count = 0; + s->input_value_count = 0; } __syncthreads(); - bool has_repetition = s->col.max_level[level_type::REPETITION] > 0; + bool const has_repetition = s->col.max_level[level_type::REPETITION] > 0; // optimization : it might be useful to have a version of gpuDecodeStream that could go wider than // 1 warp. Currently it only uses 1 warp so that it can overlap work with the value decoding step @@ -1574,22 +1457,18 @@ __global__ void __launch_bounds__(block_size) __syncwarp(); // we may have decoded different amounts from each stream, so only process what we've been - int actual_input_count = has_repetition ? min(s->lvl_count[level_type::REPETITION], - s->lvl_count[level_type::DEFINITION]) - : s->lvl_count[level_type::DEFINITION]; + int const actual_input_count = has_repetition ? min(s->lvl_count[level_type::REPETITION], + s->lvl_count[level_type::DEFINITION]) + : s->lvl_count[level_type::DEFINITION]; // process what we got back - gpuUpdatePageSizes(s, actual_input_count, t, trim_pass); + gpuUpdatePageSizes(s, actual_input_count, t); target_input_count = actual_input_count + batch_size; __syncwarp(); } } // update # rows in the actual page - if (!t) { - pp->num_rows = s->page.nesting[0].size; - pp->skipped_values = s->page.skipped_values; - pp->skipped_leaf_values = s->page.skipped_leaf_values; - } + if (!t) { pp->num_rows = s->page.nesting[0].size; } } /** @@ -1602,20 +1481,19 @@ __global__ void __launch_bounds__(block_size) * * @param pages List of pages * @param chunks List of column chunks - * @param min_row Row index to start reading at * @param num_rows Maximum number of rows to read */ -__global__ void __launch_bounds__(block_size) gpuDecodePageData( - PageInfo* pages, device_span chunks, size_t min_row, size_t num_rows) +__global__ void __launch_bounds__(block_size) + gpuDecodePageData(PageInfo* pages, device_span chunks, size_t num_rows) { __shared__ __align__(16) page_state_s state_g; page_state_s* const s = &state_g; - int page_idx = blockIdx.x; - int t = threadIdx.x; + int const page_idx = blockIdx.x; + int const t = threadIdx.x; int out_thread0; - if (!setupLocalPageInfo(s, &pages[page_idx], chunks, min_row, num_rows)) { return; } + if (!setupLocalPageInfo(s, &pages[page_idx], chunks, num_rows)) { return; } if (s->dict_base) { out_thread0 = (s->dict_bits > 0) ? 64 : 32; @@ -1624,8 +1502,6 @@ __global__ void __launch_bounds__(block_size) gpuDecodePageData( ((s->col.data_type & 7) == BOOLEAN || (s->col.data_type & 7) == BYTE_ARRAY) ? 64 : 32; } - // skipped_leaf_values will always be 0 for flat hierarchies. - uint32_t skipped_leaf_values = s->page.skipped_leaf_values; while (!s->error && (s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) { int target_pos; int src_pos = s->src_pos; @@ -1645,8 +1521,7 @@ __global__ void __launch_bounds__(block_size) gpuDecodePageData( // - produces non-NULL value indices in s->nz_idx for subsequent decoding gpuDecodeLevels(s, target_pos, t); } else if (t < out_thread0) { - // skipped_leaf_values will always be 0 for flat hierarchies. - uint32_t src_target_pos = target_pos + skipped_leaf_values; + uint32_t src_target_pos = target_pos; // WARP1: Decode dictionary indices, booleans or string positions if (s->dict_base) { @@ -1659,70 +1534,51 @@ __global__ void __launch_bounds__(block_size) gpuDecodePageData( if (t == 32) { *(volatile int32_t*)&s->dict_pos = src_target_pos; } } else { // WARP1..WARP3: Decode values - int dtype = s->col.data_type & 7; + int const dtype = s->col.data_type & 7; src_pos += t - out_thread0; // the position in the output column/buffer - int dst_pos = s->nz_idx[rolling_index(src_pos)]; - - // for the flat hierarchy case we will be reading from the beginning of the value stream, - // regardless of the value of first_row. so adjust our destination offset accordingly. - // example: - // - user has passed skip_rows = 2, so our first_row to output is 2 - // - the row values we get from nz_idx will be - // 0, 1, 2, 3, 4 .... - // - by shifting these values by first_row, the sequence becomes - // -1, -2, 0, 1, 2 ... - // - so we will end up ignoring the first two input rows, and input rows 2..n will - // get written to the output starting at position 0. - // - if (s->col.max_nesting_depth == 1) { dst_pos -= s->first_row; } + int const dst_pos = s->nz_idx[rolling_index(src_pos)]; // target_pos will always be properly bounded by num_rows, but dst_pos may be negative (values // before first_row) in the flat hierarchy case. if (src_pos < target_pos && dst_pos >= 0) { - // src_pos represents the logical row position we want to read from. But in the case of - // nested hierarchies, there is no 1:1 mapping of rows to values. So our true read position - // has to take into account the # of values we have to skip in the page to get to the - // desired logical row. For flat hierarchies, skipped_leaf_values will always be 0. - uint32_t val_src_pos = src_pos + skipped_leaf_values; - // nesting level that is storing actual leaf values - int leaf_level_index = s->col.max_nesting_depth - 1; + int const leaf_level_index = s->col.max_nesting_depth - 1; - uint32_t dtype_len = s->dtype_len; + uint32_t const dtype_len = s->dtype_len; void* dst = s->page.nesting[leaf_level_index].data_out + static_cast(dst_pos) * dtype_len; if (dtype == BYTE_ARRAY) { - gpuOutputString(s, val_src_pos, dst); + gpuOutputString(s, src_pos, dst); } else if (dtype == BOOLEAN) { - gpuOutputBoolean(s, val_src_pos, static_cast(dst)); + gpuOutputBoolean(s, src_pos, static_cast(dst)); } else if (s->col.converted_type == DECIMAL) { switch (dtype) { - case INT32: gpuOutputFast(s, val_src_pos, static_cast(dst)); break; - case INT64: gpuOutputFast(s, val_src_pos, static_cast(dst)); break; + case INT32: gpuOutputFast(s, src_pos, static_cast(dst)); break; + case INT64: gpuOutputFast(s, src_pos, static_cast(dst)); break; default: if (s->dtype_len_in <= sizeof(int32_t)) { - gpuOutputFixedLenByteArrayAsInt(s, val_src_pos, static_cast(dst)); + gpuOutputFixedLenByteArrayAsInt(s, src_pos, static_cast(dst)); } else if (s->dtype_len_in <= sizeof(int64_t)) { - gpuOutputFixedLenByteArrayAsInt(s, val_src_pos, static_cast(dst)); + gpuOutputFixedLenByteArrayAsInt(s, src_pos, static_cast(dst)); } else { - gpuOutputFixedLenByteArrayAsInt(s, val_src_pos, static_cast<__int128_t*>(dst)); + gpuOutputFixedLenByteArrayAsInt(s, src_pos, static_cast<__int128_t*>(dst)); } break; } } else if (dtype == INT96) { - gpuOutputInt96Timestamp(s, val_src_pos, static_cast(dst)); + gpuOutputInt96Timestamp(s, src_pos, static_cast(dst)); } else if (dtype_len == 8) { if (s->ts_scale) { - gpuOutputInt64Timestamp(s, val_src_pos, static_cast(dst)); + gpuOutputInt64Timestamp(s, src_pos, static_cast(dst)); } else { - gpuOutputFast(s, val_src_pos, static_cast(dst)); + gpuOutputFast(s, src_pos, static_cast(dst)); } } else if (dtype_len == 4) { - gpuOutputFast(s, val_src_pos, static_cast(dst)); + gpuOutputFast(s, src_pos, static_cast(dst)); } else { - gpuOutputGeneric(s, val_src_pos, static_cast(dst), dtype_len); + gpuOutputGeneric(s, src_pos, static_cast(dst), dtype_len); } } @@ -1793,8 +1649,6 @@ void PreprocessColumnData(hostdevice_vector& pages, std::vector& input_columns, std::vector& output_columns, size_t num_rows, - size_t min_row, - bool uses_custom_row_bounds, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { @@ -1803,16 +1657,7 @@ void PreprocessColumnData(hostdevice_vector& pages, // computes: // PageNestingInfo::size for each level of nesting, for each page. - // This computes the size for the entire page, not taking row bounds into account. - // If uses_custom_row_bounds is set to true, we have to do a second pass later that "trims" - // the starting and ending read values to account for these bounds. - gpuComputePageSizes<<>>( - pages.device_ptr(), - chunks, - // if uses_custom_row_bounds is false, include all possible rows. - uses_custom_row_bounds ? min_row : 0, - uses_custom_row_bounds ? num_rows : INT_MAX, - !uses_custom_row_bounds); + gpuComputePageSizes<<>>(pages.device_ptr(), chunks); // computes: // PageInfo::chunk_row for all pages @@ -1826,16 +1671,6 @@ void PreprocessColumnData(hostdevice_vector& pages, page_input, chunk_row_output_iter{pages.device_ptr()}); - // computes: - // PageNestingInfo::size for each level of nesting, for each page, taking row bounds into account. - // PageInfo::skipped_values, which tells us where to start decoding in the input . - // It is only necessary to do this second pass if uses_custom_row_bounds is set (if the user has - // specified artifical bounds). - if (uses_custom_row_bounds) { - gpuComputePageSizes<<>>( - pages.device_ptr(), chunks, min_row, num_rows, true); - } - // ordering of pages is by input column schema, repeated across row groups. so // if we had 3 columns, each with 2 pages, and 1 row group, our schema values might look like // @@ -1900,13 +1735,11 @@ void PreprocessColumnData(hostdevice_vector& pages, // Handle a specific corner case. It is possible to construct a parquet file such that // a column within a row group contains more rows than the row group itself. This may be // invalid, but we have seen instances of this in the wild, including how they were created - // using the apache parquet tools. Normally, the trim pass would handle this case quietly, - // but if we are not running the trim pass (which is most of the time) we need to cap the - // number of rows we will allocate/read from the file with the amount specified in the - // associated row group. This only applies to columns that are not children of lists as - // those may have an arbitrary number of rows in them. - if (!uses_custom_row_bounds && - !(out_buf.user_data & PARQUET_COLUMN_BUFFER_FLAG_HAS_LIST_PARENT) && + // using the apache parquet tools. So we need to cap the number of rows we will + // allocate/read from the file with the amount specified in the associated row group. This + // only applies to columns that are not children of lists as those may have an arbitrary + // number of rows in them. + if (!(out_buf.user_data & PARQUET_COLUMN_BUFFER_FLAG_HAS_LIST_PARENT) && size > static_cast(num_rows)) { size = static_cast(num_rows); } @@ -1941,14 +1774,13 @@ void PreprocessColumnData(hostdevice_vector& pages, void __host__ DecodePageData(hostdevice_vector& pages, hostdevice_vector const& chunks, size_t num_rows, - size_t min_row, rmm::cuda_stream_view stream) { dim3 dim_block(block_size, 1); dim3 dim_grid(pages.size(), 1); // 1 threadblock per page gpuDecodePageData<<>>( - pages.device_ptr(), chunks, min_row, num_rows); + pages.device_ptr(), chunks, num_rows); } } // namespace gpu diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 0faeba7987b..81f802ff9ca 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -135,19 +135,6 @@ struct PageInfo { Encoding definition_level_encoding; // Encoding used for definition levels (data page) Encoding repetition_level_encoding; // Encoding used for repetition levels (data page) - // for nested types, we run a preprocess step in order to determine output - // column sizes. Because of this, we can jump directly to the position in the - // input data to start decoding instead of reading all of the data and discarding - // rows we don't care about. - // - // NOTE: for flat hierarchies we do not do the preprocess step, so skipped_values and - // skipped_leaf_values will always be 0. - // - // # of values skipped in the repetition/definition level stream - int skipped_values; - // # of values skipped in the actual data stream. - int skipped_leaf_values; - // nesting information (input/output) for each page int num_nesting_levels; PageNestingInfo* nesting; @@ -429,9 +416,6 @@ void BuildStringDictionaryIndex(ColumnChunkDesc* chunks, * @param input_columns Input column information * @param output_columns Output column information * @param num_rows Maximum number of rows to read - * @param min_rows crop all rows below min_row - * @param uses_custom_row_bounds Whether or not num_rows and min_rows represents user-specific - * bounds * @param stream Cuda stream */ void PreprocessColumnData(hostdevice_vector& pages, @@ -439,8 +423,6 @@ void PreprocessColumnData(hostdevice_vector& pages, std::vector& input_columns, std::vector& output_columns, size_t num_rows, - size_t min_row, - bool uses_custom_row_bounds, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr); @@ -453,13 +435,11 @@ void PreprocessColumnData(hostdevice_vector& pages, * @param[in,out] pages All pages to be decoded * @param[in] chunks All chunks to be decoded * @param[in] num_rows Total number of rows to read - * @param[in] min_row Minimum number of rows to read * @param[in] stream CUDA stream to use, default 0 */ void DecodePageData(hostdevice_vector& pages, hostdevice_vector const& chunks, size_t num_rows, - size_t min_row, rmm::cuda_stream_view stream); /** diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index 4685ed4bc52..4ee6a88d94a 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -540,15 +540,14 @@ class aggregate_reader_metadata { * @brief Filters and reduces down to a selection of row groups * * @param row_groups Lists of row groups to read, one per source - * @param row_start Starting row of the selection - * @param row_count Total number of rows selected * - * @return List of row group indexes and its starting row + * @return List of row group info structs and the total number of rows */ - [[nodiscard]] auto select_row_groups(std::vector> const& row_groups, - size_type& row_start, - size_type& row_count) const + [[nodiscard]] std::pair, size_type> select_row_groups( + std::vector> const& row_groups) const { + size_type row_count = 0; + if (!row_groups.empty()) { std::vector selection; CUDF_EXPECTS(row_groups.size() == per_file_metadata.size(), @@ -565,17 +564,12 @@ class aggregate_reader_metadata { row_count += get_row_group(rowgroup_idx, src_idx).num_rows; } } - return selection; + return {selection, row_count}; } - row_start = std::max(row_start, 0); - if (row_count < 0) { - row_count = static_cast( - std::min(get_num_rows(), std::numeric_limits::max())); - } - row_count = min(row_count, get_num_rows() - row_start); + row_count = static_cast( + std::min(get_num_rows(), std::numeric_limits::max())); CUDF_EXPECTS(row_count >= 0, "Invalid row count"); - CUDF_EXPECTS(row_start <= get_num_rows(), "Invalid row start"); std::vector selection; size_type count = 0; @@ -583,14 +577,12 @@ class aggregate_reader_metadata { for (size_t rg_idx = 0; rg_idx < per_file_metadata[src_idx].row_groups.size(); ++rg_idx) { auto const chunk_start_row = count; count += get_row_group(rg_idx, src_idx).num_rows; - if (count > row_start || count == 0) { - selection.emplace_back(rg_idx, chunk_start_row, src_idx); - } - if (count >= row_start + row_count) { break; } + selection.emplace_back(rg_idx, chunk_start_row, src_idx); + if (count >= row_count) { break; } } } - return selection; + return {selection, row_count}; } /** @@ -1350,9 +1342,7 @@ void reader::impl::allocate_nesting_info(hostdevice_vector */ void reader::impl::preprocess_columns(hostdevice_vector& chunks, hostdevice_vector& pages, - size_t min_row, - size_t total_rows, - bool uses_custom_row_bounds, + size_t num_rows, bool has_lists) { // TODO : we should be selectively preprocessing only columns that have @@ -1365,22 +1355,15 @@ void reader::impl::preprocess_columns(hostdevice_vector& c [&](std::vector& cols) { for (size_t idx = 0; idx < cols.size(); idx++) { auto& col = cols[idx]; - col.create(total_rows, _stream, _mr); + col.create(num_rows, _stream, _mr); create_columns(col.children); } }; create_columns(_output_columns); } else { // preprocess per-nesting level sizes by page - gpu::PreprocessColumnData(pages, - chunks, - _input_columns, - _output_columns, - total_rows, - min_row, - uses_custom_row_bounds, - _stream, - _mr); + gpu::PreprocessColumnData( + pages, chunks, _input_columns, _output_columns, num_rows, _stream, _mr); _stream.synchronize(); } } @@ -1391,7 +1374,6 @@ void reader::impl::preprocess_columns(hostdevice_vector& c void reader::impl::decode_page_data(hostdevice_vector& chunks, hostdevice_vector& pages, hostdevice_vector& page_nesting, - size_t min_row, size_t total_rows) { auto is_dict_chunk = [](const gpu::ColumnChunkDesc& chunk) { @@ -1513,7 +1495,7 @@ void reader::impl::decode_page_data(hostdevice_vector& chu gpu::BuildStringDictionaryIndex(chunks.device_ptr(), chunks.size(), _stream); } - gpu::DecodePageData(pages, chunks, total_rows, min_row, _stream); + gpu::DecodePageData(pages, chunks, total_rows, _stream); pages.device_to_host(_stream); page_nesting.device_to_host(_stream); _stream.synchronize(); @@ -1605,14 +1587,10 @@ reader::impl::impl(std::vector>&& sources, _timestamp_type.id()); } -table_with_metadata reader::impl::read(size_type skip_rows, - size_type num_rows, - bool uses_custom_row_bounds, - std::vector> const& row_group_list) +table_with_metadata reader::impl::read(std::vector> const& row_group_list) { // Select only row groups required - const auto selected_row_groups = - _metadata->select_row_groups(row_group_list, skip_rows, num_rows); + const auto [selected_row_groups, num_rows] = _metadata->select_row_groups(row_group_list); table_metadata out_metadata; @@ -1761,10 +1739,10 @@ table_with_metadata reader::impl::read(size_type skip_rows, // // - for nested schemas, output buffer offset values per-page, per nesting-level for the // purposes of decoding. - preprocess_columns(chunks, pages, skip_rows, num_rows, uses_custom_row_bounds, has_lists); + preprocess_columns(chunks, pages, num_rows, has_lists); // decoding of column data itself - decode_page_data(chunks, pages, page_nesting_info, skip_rows, num_rows); + decode_page_data(chunks, pages, page_nesting_info, num_rows); auto make_output_column = [&](column_buffer& buf, column_name_info* schema_info, int i) { auto col = make_column(buf, schema_info, _stream, _mr); @@ -1828,12 +1806,7 @@ reader::~reader() = default; // Forward to implementation table_with_metadata reader::read(parquet_reader_options const& options) { - // if the user has specified custom row bounds - bool const uses_custom_row_bounds = options.get_num_rows() >= 0 || options.get_skip_rows() != 0; - return _impl->read(options.get_skip_rows(), - options.get_num_rows(), - uses_custom_row_bounds, - options.get_row_groups()); + return _impl->read(options.get_row_groups()); } } // namespace parquet diff --git a/cpp/src/io/parquet/reader_impl.hpp b/cpp/src/io/parquet/reader_impl.hpp index 97582b8ebd7..99c1a231f62 100644 --- a/cpp/src/io/parquet/reader_impl.hpp +++ b/cpp/src/io/parquet/reader_impl.hpp @@ -69,18 +69,11 @@ class reader::impl { /** * @brief Read an entire set or a subset of data and returns a set of columns * - * @param skip_rows Number of rows to skip from the start - * @param num_rows Number of rows to read - * @param uses_custom_row_bounds Whether or not num_rows and min_rows represents user-specific - * bounds * @param row_group_indices Lists of row groups to read, one per source * * @return The set of columns along with metadata */ - table_with_metadata read(size_type skip_rows, - size_type num_rows, - bool uses_custom_row_bounds, - std::vector> const& row_group_indices); + table_with_metadata read(std::vector> const& row_group_indices); private: /** @@ -159,18 +152,13 @@ class reader::impl { * * @param chunks All chunks to be decoded * @param pages All pages to be decoded - * @param min_rows crop all rows below min_row - * @param total_rows Maximum number of rows to read - * @param uses_custom_row_bounds Whether or not num_rows and min_rows represents user-specific - * bounds + * @param num_rows The number of rows to be decoded * @param has_lists Whether or not this data contains lists and requires * a preprocess. */ void preprocess_columns(hostdevice_vector& chunks, hostdevice_vector& pages, - size_t min_row, - size_t total_rows, - bool uses_custom_row_bounds, + size_t num_rows, bool has_lists); /** @@ -179,13 +167,11 @@ class reader::impl { * @param chunks List of column chunk descriptors * @param pages List of page information * @param page_nesting Page nesting array - * @param min_row Minimum number of rows from start * @param total_rows Number of rows to output */ void decode_page_data(hostdevice_vector& chunks, hostdevice_vector& pages, hostdevice_vector& page_nesting, - size_t min_row, size_t total_rows); /** diff --git a/cpp/tests/io/parquet_test.cpp b/cpp/tests/io/parquet_test.cpp index 2f153c98b48..0350bfe2981 100644 --- a/cpp/tests/io/parquet_test.cpp +++ b/cpp/tests/io/parquet_test.cpp @@ -2505,213 +2505,6 @@ TEST_F(ParquetWriterStressTest, DeviceWriteLargeTableWithValids) CUDF_TEST_EXPECT_TABLES_EQUAL(custom_tbl.tbl->view(), expected->view()); } -TEST_F(ParquetReaderTest, UserBounds) -{ - // trying to read more rows than there are should result in - // receiving the properly capped # of rows - { - srand(31337); - auto expected = create_random_fixed_table(4, 4, false); - - auto filepath = temp_env->get_temp_filepath("TooManyRows.parquet"); - cudf_io::parquet_writer_options args = - cudf_io::parquet_writer_options::builder(cudf_io::sink_info{filepath}, *expected); - cudf_io::write_parquet(args); - - // attempt to read more rows than there actually are - cudf_io::parquet_reader_options read_opts = - cudf_io::parquet_reader_options::builder(cudf_io::source_info{filepath}).num_rows(16); - auto result = cudf_io::read_parquet(read_opts); - - // we should only get back 4 rows - EXPECT_EQ(result.tbl->view().column(0).size(), 4); - } - - // trying to read past the end of the # of actual rows should result - // in empty columns. - { - srand(31337); - auto expected = create_random_fixed_table(4, 4, false); - - auto filepath = temp_env->get_temp_filepath("PastBounds.parquet"); - cudf_io::parquet_writer_options args = - cudf_io::parquet_writer_options::builder(cudf_io::sink_info{filepath}, *expected); - cudf_io::write_parquet(args); - - // attempt to read more rows than there actually are - cudf_io::parquet_reader_options read_opts = - cudf_io::parquet_reader_options::builder(cudf_io::source_info{filepath}).skip_rows(4); - auto result = cudf_io::read_parquet(read_opts); - - // we should get empty columns back - EXPECT_EQ(result.tbl->view().num_columns(), 4); - EXPECT_EQ(result.tbl->view().column(0).size(), 0); - } - - // trying to read 0 rows should result in reading the whole file - // at the moment we get back 4. when that bug gets fixed, this - // test can be flipped. - { - srand(31337); - auto expected = create_random_fixed_table(4, 4, false); - - auto filepath = temp_env->get_temp_filepath("ZeroRows.parquet"); - cudf_io::parquet_writer_options args = - cudf_io::parquet_writer_options::builder(cudf_io::sink_info{filepath}, *expected); - cudf_io::write_parquet(args); - - // attempt to read more rows than there actually are - cudf_io::parquet_reader_options read_opts = - cudf_io::parquet_reader_options::builder(cudf_io::source_info{filepath}).num_rows(0); - auto result = cudf_io::read_parquet(read_opts); - - EXPECT_EQ(result.tbl->view().num_columns(), 4); - EXPECT_EQ(result.tbl->view().column(0).size(), 0); - } - - // trying to read 0 rows past the end of the # of actual rows should result - // in empty columns. - { - srand(31337); - auto expected = create_random_fixed_table(4, 4, false); - - auto filepath = temp_env->get_temp_filepath("ZeroRowsPastBounds.parquet"); - cudf_io::parquet_writer_options args = - cudf_io::parquet_writer_options::builder(cudf_io::sink_info{filepath}, *expected); - cudf_io::write_parquet(args); - - // attempt to read more rows than there actually are - cudf_io::parquet_reader_options read_opts = - cudf_io::parquet_reader_options::builder(cudf_io::source_info{filepath}) - .skip_rows(4) - .num_rows(0); - auto result = cudf_io::read_parquet(read_opts); - - // we should get empty columns back - EXPECT_EQ(result.tbl->view().num_columns(), 4); - EXPECT_EQ(result.tbl->view().column(0).size(), 0); - } -} - -TEST_F(ParquetReaderTest, UserBoundsWithNulls) -{ - // clang-format off - cudf::test::fixed_width_column_wrapper col{{1,1,1,1,1,1,1,1, 2,2,2,2,2,2,2,2, 3,3,3,3,3,3,3,3, 4,4,4,4,4,4,4,4, 5,5,5,5,5,5,5,5, 6,6,6,6,6,6,6,6, 7,7,7,7,7,7,7,7, 8,8,8,8,8,8,8,8} - ,{1,1,1,0,0,0,1,1, 1,1,1,1,1,1,1,1, 0,0,0,0,0,0,0,0, 1,1,1,1,1,1,0,0, 1,0,1,1,1,1,1,1, 1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,0}}; - // clang-format on - cudf::table_view tbl({col}); - auto filepath = temp_env->get_temp_filepath("UserBoundsWithNulls.parquet"); - cudf_io::parquet_writer_options out_args = - cudf_io::parquet_writer_options::builder(cudf_io::sink_info{filepath}, tbl); - cudf_io::write_parquet(out_args); - - // skip_rows / num_rows - // clang-format off - std::vector> params{ {-1, -1}, {1, 3}, {3, -1}, - {31, -1}, {32, -1}, {33, -1}, - {31, 5}, {32, 5}, {33, 5}, - {-1, 7}, {-1, 31}, {-1, 32}, {-1, 33}, - {62, -1}, {63, -1}, - {62, 2}, {63, 1}}; - // clang-format on - for (auto p : params) { - cudf_io::parquet_reader_options read_args = - cudf::io::parquet_reader_options::builder(cudf_io::source_info{filepath}); - if (p.first >= 0) { read_args.set_skip_rows(p.first); } - if (p.second >= 0) { read_args.set_num_rows(p.second); } - auto result = cudf_io::read_parquet(read_args); - - p.first = p.first < 0 ? 0 : p.first; - p.second = p.second < 0 ? static_cast(col).size() - p.first : p.second; - std::vector slice_indices{p.first, p.first + p.second}; - auto expected = cudf::slice(col, slice_indices); - - CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0), expected[0]); - } -} - -TEST_F(ParquetReaderTest, UserBoundsWithNullsLarge) -{ - constexpr int num_rows = 30 * 1000000; - - std::mt19937 gen(6747); - std::bernoulli_distribution bn(0.7f); - auto valids = - cudf::detail::make_counting_transform_iterator(0, [&](int index) { return bn(gen); }); - auto values = thrust::make_counting_iterator(0); - - cudf::test::fixed_width_column_wrapper col(values, values + num_rows, valids); - - // this file will have row groups of 1,000,000 each - cudf::table_view tbl({col}); - auto filepath = temp_env->get_temp_filepath("UserBoundsWithNullsLarge.parquet"); - cudf_io::parquet_writer_options out_args = - cudf_io::parquet_writer_options::builder(cudf_io::sink_info{filepath}, tbl); - cudf_io::write_parquet(out_args); - - // skip_rows / num_rows - // clang-format off - std::vector> params{ {-1, -1}, {31, -1}, {32, -1}, {33, -1}, {1613470, -1}, {1999999, -1}, - {31, 1}, {32, 1}, {33, 1}, - // deliberately span some row group boundaries - {999000, 1001}, {999000, 2000}, {2999999, 2}, {13999997, -1}, - {16785678, 3}, {22996176, 31}, - {24001231, 17}, {29000001, 989999}, {29999999, 1} }; - // clang-format on - for (auto p : params) { - cudf_io::parquet_reader_options read_args = - cudf::io::parquet_reader_options::builder(cudf_io::source_info{filepath}); - if (p.first >= 0) { read_args.set_skip_rows(p.first); } - if (p.second >= 0) { read_args.set_num_rows(p.second); } - auto result = cudf_io::read_parquet(read_args); - - p.first = p.first < 0 ? 0 : p.first; - p.second = p.second < 0 ? static_cast(col).size() - p.first : p.second; - std::vector slice_indices{p.first, p.first + p.second}; - auto expected = cudf::slice(col, slice_indices); - - CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0), expected[0]); - } -} - -TEST_F(ParquetReaderTest, ListUserBoundsWithNullsLarge) -{ - constexpr int num_rows = 5 * 1000000; - auto colp = make_parquet_list_col(0, num_rows, 5, 8, true); - cudf::column_view col = *colp; - - // this file will have row groups of 1,000,000 each - cudf::table_view tbl({col}); - auto filepath = temp_env->get_temp_filepath("ListUserBoundsWithNullsLarge.parquet"); - cudf_io::parquet_writer_options out_args = - cudf_io::parquet_writer_options::builder(cudf_io::sink_info{filepath}, tbl); - cudf_io::write_parquet(out_args); - - // skip_rows / num_rows - // clang-format off - std::vector> params{ {-1, -1}, {31, -1}, {32, -1}, {33, -1}, {161470, -1}, {4499997, -1}, - {31, 1}, {32, 1}, {33, 1}, - // deliberately span some row group boundaries - {999000, 1001}, {999000, 2000}, {2999999, 2}, - {1678567, 3}, {4299676, 31}, - {4001231, 17}, {1900000, 989999}, {4999999, 1} }; - // clang-format on - for (auto p : params) { - cudf_io::parquet_reader_options read_args = - cudf::io::parquet_reader_options::builder(cudf_io::source_info{filepath}); - if (p.first >= 0) { read_args.set_skip_rows(p.first); } - if (p.second >= 0) { read_args.set_num_rows(p.second); } - auto result = cudf_io::read_parquet(read_args); - - p.first = p.first < 0 ? 0 : p.first; - p.second = p.second < 0 ? static_cast(col).size() - p.first : p.second; - std::vector slice_indices{p.first, p.first + p.second}; - auto expected = cudf::slice(col, slice_indices); - - CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0), expected[0]); - } -} - TEST_F(ParquetReaderTest, ReorderedColumns) { {