diff --git a/cpp/src/io/parquet/delta_binary.cuh b/cpp/src/io/parquet/delta_binary.cuh index e3b23f4c0a0..ccc28791071 100644 --- a/cpp/src/io/parquet/delta_binary.cuh +++ b/cpp/src/io/parquet/delta_binary.cuh @@ -39,15 +39,15 @@ namespace cudf::io::parquet::detail { // per mini-block. While encoding, the lowest delta value is subtracted from all the deltas in the // block to ensure that all encoded values are positive. The deltas for each mini-block are bit // packed using the same encoding as the RLE/Bit-Packing Hybrid encoder. -// -// DELTA_BYTE_ARRAY encoding (incremental encoding or front compression), is used for BYTE_ARRAY -// columns. For each element in a sequence of strings, a prefix length from the preceding string -// and a suffix is stored. The prefix lengths are DELTA_BINARY_PACKED encoded. The suffixes are -// encoded with DELTA_LENGTH_BYTE_ARRAY encoding, which is a DELTA_BINARY_PACKED list of suffix -// lengths, followed by the concatenated suffix data. -// we decode one mini-block at a time. max mini-block size seen is 64. -constexpr int delta_rolling_buf_size = 128; +// The largest mini-block size we can currently support. +constexpr int max_delta_mini_block_size = 64; + +// The first pass decodes `values_per_mb` values, and then the second pass does another +// batch of size `values_per_mb`. The largest value for values_per_miniblock among the +// major writers seems to be 64, so 2 * 64 should be good. We save the first value separately +// since it is not encoded in the first mini-block. +constexpr int delta_rolling_buf_size = 2 * max_delta_mini_block_size; /** * @brief Read a ULEB128 varint integer @@ -90,7 +90,8 @@ struct delta_binary_decoder { uleb128_t mini_block_count; // usually 4, chosen such that block_size/mini_block_count is a // multiple of 32 uleb128_t value_count; // total values encoded in the block - zigzag128_t last_value; // last value decoded, initialized to first_value from header + zigzag128_t first_value; // initial value, stored in the header + zigzag128_t last_value; // last value decoded uint32_t values_per_mb; // block_size / mini_block_count, must be multiple of 32 uint32_t current_value_idx; // current value index, initialized to 0 at start of block @@ -102,6 +103,13 @@ struct delta_binary_decoder { uleb128_t value[delta_rolling_buf_size]; // circular buffer of delta values + // returns the value stored in the `value` array at index + // `rolling_index(idx)`. If `idx` is `0`, then return `first_value`. + constexpr zigzag128_t value_at(size_type idx) + { + return idx == 0 ? first_value : value[rolling_index(idx)]; + } + // returns the number of values encoded in the block data. when all_values is true, // account for the first value in the header. otherwise just count the values encoded // in the mini-block data. @@ -145,7 +153,8 @@ struct delta_binary_decoder { block_size = get_uleb128(d_start, d_end); mini_block_count = get_uleb128(d_start, d_end); value_count = get_uleb128(d_start, d_end); - last_value = get_zz128(d_start, d_end); + first_value = get_zz128(d_start, d_end); + last_value = first_value; current_value_idx = 0; values_per_mb = block_size / mini_block_count; @@ -179,6 +188,28 @@ struct delta_binary_decoder { } } + // given start/end pointers in the data, find the end of the binary encoded block. when done, + // `this` will be initialized with the correct start and end positions. returns the end, which is + // start of data/next block. should only be called from thread 0. + inline __device__ uint8_t const* find_end_of_block(uint8_t const* start, uint8_t const* end) + { + // read block header + init_binary_block(start, end); + + // test for no encoded values. a single value will be in the block header. + if (value_count <= 1) { return block_start; } + + // read mini-block headers and skip over data + while (current_value_idx < num_encoded_values(false)) { + setup_next_mini_block(false); + } + // calculate the correct end of the block + auto const* const new_end = cur_mb == 0 ? block_start : cur_mb_start; + // re-init block with correct end + init_binary_block(start, new_end); + return new_end; + } + // decode the current mini-batch of deltas, and convert to values. // called by all threads in a warp, currently only one warp supported. inline __device__ void calc_mini_block_values(int lane_id) @@ -186,12 +217,9 @@ struct delta_binary_decoder { using cudf::detail::warp_size; if (current_value_idx >= value_count) { return; } - // need to save first value from header on first pass + // need to account for the first value from header on first pass if (current_value_idx == 0) { - if (lane_id == 0) { - current_value_idx++; - value[0] = last_value; - } + if (lane_id == 0) { current_value_idx++; } __syncwarp(); if (current_value_idx >= value_count) { return; } } diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index a783b489c02..0c53877f7c7 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -449,8 +449,13 @@ __global__ void __launch_bounds__(decode_block_size) int out_thread0; [[maybe_unused]] null_count_back_copier _{s, t}; - if (!setupLocalPageInfo( - s, &pages[page_idx], chunks, min_row, num_rows, mask_filter{KERNEL_MASK_GENERAL}, true)) { + if (!setupLocalPageInfo(s, + &pages[page_idx], + chunks, + min_row, + num_rows, + mask_filter{decode_kernel_mask::GENERAL}, + true)) { return; } @@ -486,6 +491,7 @@ __global__ void __launch_bounds__(decode_block_size) target_pos = min(s->nz_count, src_pos + decode_block_size - out_thread0); if (out_thread0 > 32) { target_pos = min(target_pos, s->dict_pos); } } + // TODO(ets): see if this sync can be removed __syncthreads(); if (t < 32) { // decode repetition and definition levels. @@ -603,7 +609,7 @@ __global__ void __launch_bounds__(decode_block_size) } struct mask_tform { - __device__ uint32_t operator()(PageInfo const& p) { return p.kernel_mask; } + __device__ uint32_t operator()(PageInfo const& p) { return static_cast(p.kernel_mask); } }; } // anonymous namespace diff --git a/cpp/src/io/parquet/page_decode.cuh b/cpp/src/io/parquet/page_decode.cuh index ab1cc68923d..4db9bd3904b 100644 --- a/cpp/src/io/parquet/page_decode.cuh +++ b/cpp/src/io/parquet/page_decode.cuh @@ -991,8 +991,15 @@ struct all_types_filter { * @brief Functor for setupLocalPageInfo that takes a mask of allowed types. */ struct mask_filter { - int mask; - __device__ inline bool operator()(PageInfo const& page) { return (page.kernel_mask & mask) != 0; } + uint32_t mask; + + __device__ mask_filter(uint32_t m) : mask(m) {} + __device__ mask_filter(decode_kernel_mask m) : mask(static_cast(m)) {} + + __device__ inline bool operator()(PageInfo const& page) + { + return BitAnd(mask, page.kernel_mask) != 0; + } }; /** @@ -1306,6 +1313,7 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s, s->dict_run = 0; } break; case Encoding::DELTA_BINARY_PACKED: + case Encoding::DELTA_BYTE_ARRAY: // nothing to do, just don't error break; default: { diff --git a/cpp/src/io/parquet/page_delta_decode.cu b/cpp/src/io/parquet/page_delta_decode.cu index bb5e5066b69..bc025c6fc3e 100644 --- a/cpp/src/io/parquet/page_delta_decode.cu +++ b/cpp/src/io/parquet/page_delta_decode.cu @@ -27,6 +27,277 @@ namespace cudf::io::parquet::detail { namespace { +constexpr int decode_block_size = 128; + +// DELTA_BYTE_ARRAY encoding (incremental encoding or front compression), is used for BYTE_ARRAY +// columns. For each element in a sequence of strings, a prefix length from the preceding string +// and a suffix is stored. The prefix lengths are DELTA_BINARY_PACKED encoded. The suffixes are +// encoded with DELTA_LENGTH_BYTE_ARRAY encoding, which is a DELTA_BINARY_PACKED list of suffix +// lengths, followed by the concatenated suffix data. +struct delta_byte_array_decoder { + uint8_t const* last_string; // pointer to last decoded string...needed for its prefix + uint8_t const* suffix_char_data; // pointer to the start of character data + + uint8_t* temp_buf; // buffer used when skipping values + uint32_t start_val; // decoded strings up to this index will be dumped to temp_buf + uint32_t last_string_len; // length of the last decoded string + + delta_binary_decoder prefixes; // state of decoder for prefix lengths + delta_binary_decoder suffixes; // state of decoder for suffix lengths + + // initialize the prefixes and suffixes blocks + __device__ void init(uint8_t const* start, uint8_t const* end, uint32_t start_idx, uint8_t* temp) + { + auto const* suffix_start = prefixes.find_end_of_block(start, end); + suffix_char_data = suffixes.find_end_of_block(suffix_start, end); + last_string = nullptr; + temp_buf = temp; + start_val = start_idx; + } + + // kind of like an inclusive scan for strings. takes prefix_len bytes from preceding + // string and prepends to the suffix we've already copied into place. called from + // within loop over values_in_mb, so this only needs to handle a single warp worth of data + // at a time. + __device__ void string_scan(uint8_t* strings_out, + uint8_t const* last_string, + uint32_t start_idx, + uint32_t end_idx, + uint32_t offset, + uint32_t lane_id) + { + using cudf::detail::warp_size; + + // let p(n) === length(prefix(string_n)) + // + // if p(n-1) > p(n), then string_n can be completed when string_n-2 is completed. likewise if + // p(m) > p(n), then string_n can be completed with string_m-1. however, if p(m) < p(n), then m + // is a "blocker" for string_n; string_n can be completed only after string_m is. + // + // we will calculate the nearest blocking position for each lane, and then fill in string_0. we + // then iterate, finding all lanes that have had their "blocker" filled in and completing them. + // when all lanes are filled in, we return. this will still hit the worst case if p(n-1) < p(n) + // for all n + __shared__ __align__(8) int64_t prefix_lens[warp_size]; + __shared__ __align__(8) uint8_t const* offsets[warp_size]; + + uint32_t const ln_idx = start_idx + lane_id; + uint64_t prefix_len = ln_idx < end_idx ? prefixes.value_at(ln_idx) : 0; + uint8_t* const lane_out = ln_idx < end_idx ? strings_out + offset : nullptr; + + prefix_lens[lane_id] = prefix_len; + offsets[lane_id] = lane_out; + + // if all prefix_len's are zero, then there's nothing to do + if (__all_sync(0xffff'ffff, prefix_len == 0)) { return; } + + // find a neighbor to the left that has a prefix length less than this lane. once that + // neighbor is complete, this lane can be completed. + int blocker = lane_id - 1; + while (blocker > 0 && prefix_lens[blocker] != 0 && prefix_len <= prefix_lens[blocker]) { + blocker--; + } + + // fill in lane 0 (if necessary) + if (lane_id == 0 && prefix_len > 0) { + memcpy(lane_out, last_string, prefix_len); + prefix_lens[0] = prefix_len = 0; + } + __syncwarp(); + + // now fill in blockers until done + for (uint32_t i = 1; i < warp_size && i + start_idx < end_idx; i++) { + if (prefix_len != 0 && prefix_lens[blocker] == 0 && lane_out != nullptr) { + memcpy(lane_out, offsets[blocker], prefix_len); + prefix_lens[lane_id] = prefix_len = 0; + } + + // check for finished + if (__all_sync(0xffff'ffff, prefix_len == 0)) { return; } + } + } + + // calculate a mini-batch of string values, writing the results to + // `strings_out`. starting at global index `start_idx` and decoding + // up to `num_values` strings. + // called by all threads in a warp. used for strings <= 32 chars. + // returns number of bytes written + __device__ size_t calculate_string_values(uint8_t* strings_out, + uint32_t start_idx, + uint32_t num_values, + uint32_t lane_id) + { + using cudf::detail::warp_size; + using WarpScan = cub::WarpScan; + __shared__ WarpScan::TempStorage scan_temp; + + if (start_idx >= suffixes.value_count) { return 0; } + auto end_idx = start_idx + min(suffixes.values_per_mb, num_values); + end_idx = min(end_idx, static_cast(suffixes.value_count)); + + auto p_strings_out = strings_out; + auto p_temp_out = temp_buf; + + auto copy_batch = [&](uint8_t* out, uint32_t idx, uint32_t end) { + uint32_t const ln_idx = idx + lane_id; + + // calculate offsets into suffix data + uint64_t const suffix_len = ln_idx < end ? suffixes.value_at(ln_idx) : 0; + uint64_t suffix_off = 0; + WarpScan(scan_temp).ExclusiveSum(suffix_len, suffix_off); + + // calculate offsets into string data + uint64_t const prefix_len = ln_idx < end ? prefixes.value_at(ln_idx) : 0; + uint64_t const string_len = prefix_len + suffix_len; + + // get offset into output for each lane + uint64_t string_off, warp_total; + WarpScan(scan_temp).ExclusiveSum(string_len, string_off, warp_total); + auto const so_ptr = out + string_off; + + // copy suffixes into string data + if (ln_idx < end) { memcpy(so_ptr + prefix_len, suffix_char_data + suffix_off, suffix_len); } + __syncwarp(); + + // copy prefixes into string data. + string_scan(out, last_string, idx, end, string_off, lane_id); + + // save the position of the last computed string. this will be used in + // the next iteration to reconstruct the string in lane 0. + if (ln_idx == end - 1 || (ln_idx < end && lane_id == 31)) { + // set last_string to this lane's string + last_string = out + string_off; + last_string_len = string_len; + // and consume used suffix_char_data + suffix_char_data += suffix_off + suffix_len; + } + + return warp_total; + }; + + uint64_t string_total = 0; + for (int idx = start_idx; idx < end_idx; idx += warp_size) { + auto const n_in_batch = min(warp_size, end_idx - idx); + // account for the case where start_val occurs in the middle of this batch + if (idx < start_val && idx + n_in_batch > start_val) { + // dump idx...start_val into temp_buf + copy_batch(p_temp_out, idx, start_val); + __syncwarp(); + + // start_val...idx + n_in_batch into strings_out + auto nbytes = copy_batch(p_strings_out, start_val, idx + n_in_batch); + p_strings_out += nbytes; + string_total = nbytes; + } else { + if (idx < start_val) { + p_temp_out += copy_batch(p_temp_out, idx, end_idx); + } else { + auto nbytes = copy_batch(p_strings_out, idx, end_idx); + p_strings_out += nbytes; + string_total += nbytes; + } + } + __syncwarp(); + } + + return string_total; + } + + // character parallel version of CalculateStringValues(). This is faster for strings longer than + // 32 chars. + __device__ size_t calculate_string_values_cp(uint8_t* strings_out, + uint32_t start_idx, + uint32_t num_values, + uint32_t lane_id) + { + using cudf::detail::warp_size; + __shared__ __align__(8) uint8_t* so_ptr; + + if (start_idx >= suffixes.value_count) { return; } + auto end_idx = start_idx + min(suffixes.values_per_mb, num_values); + end_idx = min(end_idx, static_cast(suffixes.value_count)); + + if (lane_id == 0) { so_ptr = start_idx < start_val ? temp_buf : strings_out; } + __syncwarp(); + + uint64_t string_total = 0; + for (int idx = start_idx; idx < end_idx; idx++) { + uint64_t const suffix_len = suffixes.value_at(idx); + uint64_t const prefix_len = prefixes.value_at(idx); + uint64_t const string_len = prefix_len + suffix_len; + + // copy prefix and suffix data into current strings_out position + // for longer strings use a 4-byte version stolen from gather_chars_fn_string_parallel. + if (string_len > 64) { + if (prefix_len > 0) { wideStrcpy(so_ptr, last_string, prefix_len, lane_id); } + if (suffix_len > 0) { + wideStrcpy(so_ptr + prefix_len, suffix_char_data, suffix_len, lane_id); + } + } else { + for (int i = lane_id; i < string_len; i += warp_size) { + so_ptr[i] = i < prefix_len ? last_string[i] : suffix_char_data[i - prefix_len]; + } + } + __syncwarp(); + + if (idx >= start_val) { string_total += string_len; } + + if (lane_id == 0) { + last_string = so_ptr; + last_string_len = string_len; + suffix_char_data += suffix_len; + if (idx == start_val - 1) { + so_ptr = strings_out; + } else { + so_ptr += string_len; + } + } + __syncwarp(); + } + + return string_total; + } + + // dump strings before start_val to temp buf + __device__ void skip(bool use_char_ll) + { + using cudf::detail::warp_size; + int const t = threadIdx.x; + int const lane_id = t % warp_size; + + // is this even necessary? return if asking to skip the whole block. + if (start_val >= prefixes.num_encoded_values(true)) { return; } + + // prefixes and suffixes will have the same parameters (it's checked earlier) + auto const batch_size = prefixes.values_per_mb; + + uint32_t skip_pos = 0; + while (prefixes.current_value_idx < start_val) { + // warp 0 gets prefixes and warp 1 gets suffixes + auto* const db = t < 32 ? &prefixes : &suffixes; + + // this will potentially decode past start_val, but that's ok + if (t < 64) { db->decode_batch(); } + __syncthreads(); + + // warp 0 decodes the batch. + if (t < 32) { + auto const num_to_decode = min(batch_size, start_val - skip_pos); + auto const bytes_written = + use_char_ll ? calculate_string_values_cp(temp_buf, skip_pos, num_to_decode, lane_id) + : calculate_string_values(temp_buf, skip_pos, num_to_decode, lane_id); + // store last_string someplace safe in temp buffer + if (t == 0) { + memcpy(temp_buf + bytes_written, last_string, last_string_len); + last_string = temp_buf + bytes_written; + } + } + skip_pos += prefixes.values_per_mb; + __syncthreads(); + } + } +}; + // Decode page data that is DELTA_BINARY_PACKED encoded. This encoding is // only used for int32 and int64 physical types (and appears to only be used // with V2 page headers; see https://www.mail-archive.com/dev@parquet.apache.org/msg11826.html). @@ -52,13 +323,9 @@ __global__ void __launch_bounds__(96) auto* const db = &db_state; [[maybe_unused]] null_count_back_copier _{s, t}; - if (!setupLocalPageInfo(s, - &pages[page_idx], - chunks, - min_row, - num_rows, - mask_filter{KERNEL_MASK_DELTA_BINARY}, - true)) { + auto const mask = decode_kernel_mask::DELTA_BINARY; + if (!setupLocalPageInfo( + s, &pages[page_idx], chunks, min_row, num_rows, mask_filter{mask}, true)) { return; } @@ -78,6 +345,10 @@ __global__ void __launch_bounds__(96) __syncthreads(); auto const batch_size = db->values_per_mb; + if (batch_size > max_delta_mini_block_size) { + set_error(static_cast(decode_error::DELTA_PARAMS_UNSUPPORTED), error_code); + return; + } // if skipped_leaf_values is non-zero, then we need to decode up to the first mini-block // that has a value we need. @@ -93,6 +364,7 @@ __global__ void __launch_bounds__(96) } else { // warp2 target_pos = min(s->nz_count, src_pos + batch_size); } + // TODO(ets): see if this sync can be removed __syncthreads(); // warp0 will decode the rep/def levels, warp1 will unpack a mini-batch of deltas. @@ -125,23 +397,12 @@ __global__ void __launch_bounds__(96) // place value for this thread if (dst_pos >= 0 && sp < target_pos) { void* const dst = nesting_info_base[leaf_level_index].data_out + dst_pos * s->dtype_len; + auto const val = db->value_at(sp + skipped_leaf_values); switch (s->dtype_len) { - case 1: - *static_cast(dst) = - db->value[rolling_index(sp + skipped_leaf_values)]; - break; - case 2: - *static_cast(dst) = - db->value[rolling_index(sp + skipped_leaf_values)]; - break; - case 4: - *static_cast(dst) = - db->value[rolling_index(sp + skipped_leaf_values)]; - break; - case 8: - *static_cast(dst) = - db->value[rolling_index(sp + skipped_leaf_values)]; - break; + case 1: *static_cast(dst) = val; break; + case 2: *static_cast(dst) = val; break; + case 4: *static_cast(dst) = val; break; + case 8: *static_cast(dst) = val; break; } } } @@ -154,6 +415,164 @@ __global__ void __launch_bounds__(96) if (t == 0 and s->error != 0) { set_error(s->error, error_code); } } +// Decode page data that is DELTA_BYTE_ARRAY packed. This encoding consists of a DELTA_BINARY_PACKED +// array of prefix lengths, followed by a DELTA_BINARY_PACKED array of suffix lengths, followed by +// the suffixes (technically the suffixes are DELTA_LENGTH_BYTE_ARRAY encoded). The latter two can +// be used to create an offsets array for the suffix data, but then this needs to be combined with +// the prefix lengths to do the final decode for each value. Because the lengths of the prefixes and +// suffixes are not encoded in the header, we're going to have to first do a quick pass through them +// to find the start/end of each structure. +template +__global__ void __launch_bounds__(decode_block_size) + gpuDecodeDeltaByteArray(PageInfo* pages, + device_span chunks, + size_t min_row, + size_t num_rows, + int32_t* error_code) +{ + using cudf::detail::warp_size; + __shared__ __align__(16) delta_byte_array_decoder db_state; + __shared__ __align__(16) page_state_s state_g; + __shared__ __align__(16) page_state_buffers_s state_buffers; + + page_state_s* const s = &state_g; + auto* const sb = &state_buffers; + int const page_idx = blockIdx.x; + int const t = threadIdx.x; + int const lane_id = t % warp_size; + auto* const prefix_db = &db_state.prefixes; + auto* const suffix_db = &db_state.suffixes; + auto* const dba = &db_state; + [[maybe_unused]] null_count_back_copier _{s, t}; + + auto const mask = decode_kernel_mask::DELTA_BYTE_ARRAY; + if (!setupLocalPageInfo( + s, &pages[page_idx], chunks, min_row, num_rows, mask_filter{mask}, true)) { + return; + } + + bool const has_repetition = s->col.max_level[level_type::REPETITION] > 0; + + // choose a character parallel string copy when the average string is longer than a warp + auto const use_char_ll = (s->page.str_bytes / s->page.num_valids) > cudf::detail::warp_size; + + // copying logic from gpuDecodePageData. + PageNestingDecodeInfo const* nesting_info_base = s->nesting_info; + + __shared__ level_t rep[delta_rolling_buf_size]; // circular buffer of repetition level values + __shared__ level_t def[delta_rolling_buf_size]; // circular buffer of definition level values + + // skipped_leaf_values will always be 0 for flat hierarchies. + uint32_t const skipped_leaf_values = s->page.skipped_leaf_values; + + if (t == 0) { + // initialize the prefixes and suffixes blocks + dba->init(s->data_start, s->data_end, s->page.start_val, s->page.temp_string_buf); + } + __syncthreads(); + + // assert that prefix and suffix have same mini-block size + if (prefix_db->values_per_mb != suffix_db->values_per_mb or + prefix_db->block_size != suffix_db->block_size or + prefix_db->value_count != suffix_db->value_count) { + set_error(static_cast(decode_error::DELTA_PARAM_MISMATCH), error_code); + return; + } + + // pointer to location to output final strings + int const leaf_level_index = s->col.max_nesting_depth - 1; + auto strings_data = nesting_info_base[leaf_level_index].string_out; + + auto const batch_size = prefix_db->values_per_mb; + if (batch_size > max_delta_mini_block_size) { + set_error(static_cast(decode_error::DELTA_PARAMS_UNSUPPORTED), error_code); + return; + } + + // if this is a bounds page and nested, then we need to skip up front. non-nested will work + // its way through the page. + int string_pos = has_repetition ? s->page.start_val : 0; + auto const is_bounds_pg = is_bounds_page(s, min_row, num_rows, has_repetition); + if (is_bounds_pg && string_pos > 0) { dba->skip(use_char_ll); } + + while (!s->error && (s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) { + uint32_t target_pos; + uint32_t const src_pos = s->src_pos; + + if (t < 3 * warp_size) { // warp 0..2 + target_pos = min(src_pos + 2 * batch_size, s->nz_count + s->first_row + batch_size); + } else { // warp 3 + target_pos = min(s->nz_count, src_pos + batch_size); + } + // TODO(ets): see if this sync can be removed + __syncthreads(); + + // warp0 will decode the rep/def levels, warp1 will unpack a mini-batch of prefixes, warp 2 will + // unpack a mini-batch of suffixes. warp3 waits one cycle for warps 0-2 to produce a batch, and + // then stuffs values into the proper location in the output. + if (t < warp_size) { + // decode repetition and definition levels. + // - update validity vectors + // - updates offsets (for nested columns) + // - produces non-NULL value indices in s->nz_idx for subsequent decoding + gpuDecodeLevels(s, sb, target_pos, rep, def, t); + + } else if (t < 2 * warp_size) { + // warp 1 + prefix_db->decode_batch(); + + } else if (t < 3 * warp_size) { + // warp 2 + suffix_db->decode_batch(); + + } else if (src_pos < target_pos) { + // warp 3 + + int const nproc = min(batch_size, s->page.end_val - string_pos); + strings_data += use_char_ll + ? dba->calculate_string_values_cp(strings_data, string_pos, nproc, lane_id) + : dba->calculate_string_values(strings_data, string_pos, nproc, lane_id); + string_pos += nproc; + + // process the mini-block in batches of 32 + for (uint32_t sp = src_pos + lane_id; sp < src_pos + batch_size; sp += 32) { + // the position in the output column/buffer + int dst_pos = sb->nz_idx[rolling_index(sp)]; + + // handle skip_rows here. flat hierarchies can just skip up to first_row. + if (!has_repetition) { dst_pos -= s->first_row; } + + if (dst_pos >= 0 && sp < target_pos) { + auto const offptr = + reinterpret_cast(nesting_info_base[leaf_level_index].data_out) + dst_pos; + auto const src_idx = sp + skipped_leaf_values; + *offptr = prefix_db->value_at(src_idx) + suffix_db->value_at(src_idx); + } + __syncwarp(); + } + + if (lane_id == 0) { s->src_pos = src_pos + batch_size; } + } + + __syncthreads(); + } + + // now turn array of lengths into offsets + int value_count = nesting_info_base[leaf_level_index].value_count; + + // if no repetition we haven't calculated start/end bounds and instead just skipped + // values until we reach first_row. account for that here. + if (!has_repetition) { value_count -= s->first_row; } + + auto const offptr = reinterpret_cast(nesting_info_base[leaf_level_index].data_out); + block_excl_sum(offptr, value_count, s->page.str_offset); + + if (t == 0 and s->error != 0) { + cuda::atomic_ref ref{*error_code}; + ref.fetch_or(s->error, cuda::std::memory_order_relaxed); + } +} + } // anonymous namespace /** @@ -181,4 +600,29 @@ void __host__ DecodeDeltaBinary(cudf::detail::hostdevice_vector& pages } } +/** + * @copydoc cudf::io::parquet::gpu::DecodeDeltaByteArray + */ +void __host__ DecodeDeltaByteArray(cudf::detail::hostdevice_vector& pages, + cudf::detail::hostdevice_vector const& chunks, + size_t num_rows, + size_t min_row, + int level_type_size, + int32_t* error_code, + rmm::cuda_stream_view stream) +{ + CUDF_EXPECTS(pages.size() > 0, "There is no page to decode"); + + dim3 const dim_block(decode_block_size, 1); + dim3 const dim_grid(pages.size(), 1); // 1 threadblock per page + + if (level_type_size == 1) { + gpuDecodeDeltaByteArray<<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } else { + gpuDecodeDeltaByteArray<<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } +} + } // namespace cudf::io::parquet::detail diff --git a/cpp/src/io/parquet/page_hdr.cu b/cpp/src/io/parquet/page_hdr.cu index 22add2fffc6..595dd40cdc2 100644 --- a/cpp/src/io/parquet/page_hdr.cu +++ b/cpp/src/io/parquet/page_hdr.cu @@ -146,18 +146,21 @@ __device__ void skip_struct_field(byte_stream_s* bs, int field_type) * @param chunk Column chunk the page belongs to * @return `kernel_mask_bits` value for the given page */ -__device__ uint32_t kernel_mask_for_page(PageInfo const& page, ColumnChunkDesc const& chunk) +__device__ decode_kernel_mask kernel_mask_for_page(PageInfo const& page, + ColumnChunkDesc const& chunk) { - if (page.flags & PAGEINFO_FLAGS_DICTIONARY) { return 0; } + if (page.flags & PAGEINFO_FLAGS_DICTIONARY) { return decode_kernel_mask::NONE; } if (page.encoding == Encoding::DELTA_BINARY_PACKED) { - return KERNEL_MASK_DELTA_BINARY; + return decode_kernel_mask::DELTA_BINARY; + } else if (page.encoding == Encoding::DELTA_BYTE_ARRAY) { + return decode_kernel_mask::DELTA_BYTE_ARRAY; } else if (is_string_col(chunk)) { - return KERNEL_MASK_STRING; + return decode_kernel_mask::STRING; } // non-string, non-delta - return KERNEL_MASK_GENERAL; + return decode_kernel_mask::GENERAL; } /** @@ -380,7 +383,9 @@ __global__ void __launch_bounds__(128) bs->page.skipped_values = -1; bs->page.skipped_leaf_values = 0; bs->page.str_bytes = 0; - bs->page.kernel_mask = 0; + bs->page.temp_string_size = 0; + bs->page.temp_string_buf = nullptr; + bs->page.kernel_mask = decode_kernel_mask::NONE; } num_values = bs->ck.num_values; page_info = bs->ck.page_info; diff --git a/cpp/src/io/parquet/page_string_decode.cu b/cpp/src/io/parquet/page_string_decode.cu index 4c7d8e3c20a..e29db042401 100644 --- a/cpp/src/io/parquet/page_string_decode.cu +++ b/cpp/src/io/parquet/page_string_decode.cu @@ -14,20 +14,28 @@ * limitations under the License. */ +#include "delta_binary.cuh" #include "page_decode.cuh" #include "page_string_utils.cuh" #include +#include #include +#include +#include + +#include + namespace cudf::io::parquet::detail { namespace { -constexpr int preprocess_block_size = 512; -constexpr int decode_block_size = 128; -constexpr int rolling_buf_size = decode_block_size * 2; -constexpr int preproc_buf_size = LEVEL_DECODE_BUF_SIZE; +constexpr int preprocess_block_size = 512; +constexpr int decode_block_size = 128; +constexpr int delta_preproc_block_size = 64; +constexpr int rolling_buf_size = decode_block_size * 2; +constexpr int preproc_buf_size = LEVEL_DECODE_BUF_SIZE; /** * @brief Compute the start and end page value bounds for this page @@ -450,12 +458,107 @@ __device__ size_t totalPlainEntriesSize(uint8_t const* data, } /** - * @brief Kernel for computing string page output size information. + * @brief Compute string size information for DELTA_BYTE_ARRAY encoded strings. + * + * This traverses the packed prefix and suffix lengths, summing them to obtain the total + * number of bytes needed for the decoded string data. It also calculates an upper bound + * for the largest string length to obtain an upper bound on temporary space needed if + * rows will be skipped. + * + * Called with 64 threads. + * + * @param data Pointer to the start of the page data stream + * @param end Pointer to the end of the page data stream + * @param start_value Do not count values that occur before this index + * @param end_value Do not count values that occur after this index + * @return A pair of `size_t` values representing the total string size and temp buffer size + * required for decoding + */ +__device__ thrust::pair totalDeltaByteArraySize(uint8_t const* data, + uint8_t const* end, + int start_value, + int end_value) +{ + using cudf::detail::warp_size; + using WarpReduce = cub::WarpReduce; + __shared__ typename WarpReduce::TempStorage temp_storage[2]; + + __shared__ __align__(16) delta_binary_decoder prefixes; + __shared__ __align__(16) delta_binary_decoder suffixes; + + int const t = threadIdx.x; + int const lane_id = t % warp_size; + int const warp_id = t / warp_size; + + if (t == 0) { + auto const* suffix_start = prefixes.find_end_of_block(data, end); + suffixes.init_binary_block(suffix_start, end); + } + __syncthreads(); + + // two warps will traverse the prefixes and suffixes and sum them up + auto const db = t < warp_size ? &prefixes : t < 2 * warp_size ? &suffixes : nullptr; + + size_t total_bytes = 0; + uleb128_t max_len = 0; + + if (db != nullptr) { + // initialize with first value (which is stored in last_value) + if (lane_id == 0 && start_value == 0) { total_bytes = db->last_value; } + + uleb128_t lane_sum = 0; + uleb128_t lane_max = 0; + while (db->current_value_idx < end_value && + db->current_value_idx < db->num_encoded_values(true)) { + // calculate values for current mini-block + db->calc_mini_block_values(lane_id); + + // get per lane sum for mini-block + for (uint32_t i = 0; i < db->values_per_mb; i += 32) { + uint32_t const idx = db->current_value_idx + i + lane_id; + if (idx >= start_value && idx < end_value && idx < db->value_count) { + lane_sum += db->value[rolling_index(idx)]; + lane_max = max(lane_max, db->value[rolling_index(idx)]); + } + } + + if (lane_id == 0) { db->setup_next_mini_block(true); } + __syncwarp(); + } + + // get sum for warp. + // note: warp_sum will only be valid on lane 0. + auto const warp_sum = WarpReduce(temp_storage[warp_id]).Sum(lane_sum); + auto const warp_max = WarpReduce(temp_storage[warp_id]).Reduce(lane_max, cub::Max()); + + if (lane_id == 0) { + total_bytes += warp_sum; + max_len = warp_max; + } + } + __syncthreads(); + + // now sum up total_bytes from the two warps + auto const final_bytes = + cudf::detail::single_lane_block_sum_reduce(total_bytes); + + // Sum up prefix and suffix max lengths to get a max possible string length. Multiply that + // by the number of strings in a mini-block, plus one to save the last string. + auto const temp_bytes = + cudf::detail::single_lane_block_sum_reduce(max_len) * + (db->values_per_mb + 1); + + return {final_bytes, temp_bytes}; +} + +/** + * @brief Kernel for computing string page bounds information. * - * String columns need accurate data size information to preallocate memory in the column buffer to - * store the char data. This calls a kernel to calculate information needed by the string decoding - * kernel. On exit, the `str_bytes`, `num_nulls`, and `num_valids` fields of the PageInfo struct - * are updated. This call ignores non-string columns. + * This kernel traverses the repetition and definition level data to determine start and end values + * for pages with string-like data. Also calculates the number of null and valid values in the + * page. Does nothing if the page mask is neither `STRING` nor `DELTA_BYTE_ARRAY`. On exit the + * `num_nulls`, `num_valids`, `start_val` and `end_val` fields of the `PageInfo` struct will be + * populated. * * @param pages All pages to be decoded * @param chunks All chunks to be decoded @@ -464,7 +567,7 @@ __device__ size_t totalPlainEntriesSize(uint8_t const* data, * @tparam level_t Type used to store decoded repetition and definition levels */ template -__global__ void __launch_bounds__(preprocess_block_size) gpuComputePageStringSizes( +__global__ void __launch_bounds__(preprocess_block_size) gpuComputeStringPageBounds( PageInfo* pages, device_span chunks, size_t min_row, size_t num_rows) { __shared__ __align__(16) page_state_s state_g; @@ -474,8 +577,13 @@ __global__ void __launch_bounds__(preprocess_block_size) gpuComputePageStringSiz int const t = threadIdx.x; PageInfo* const pp = &pages[page_idx]; - // reset str_bytes to 0 in case it's already been calculated - if (t == 0) { pp->str_bytes = 0; } + if (t == 0) { + s->page.num_nulls = 0; + s->page.num_valids = 0; + // reset str_bytes to 0 in case it's already been calculated (esp needed for chunked reads). + // TODO: need to rethink this once str_bytes is in the statistics + pp->str_bytes = 0; + } // whether or not we have repetition levels (lists) bool const has_repetition = chunks[pp->chunk_idx].max_level[level_type::REPETITION] > 0; @@ -491,23 +599,11 @@ __global__ void __launch_bounds__(preprocess_block_size) gpuComputePageStringSiz {rep_runs}}; // setup page info - if (!setupLocalPageInfo( - s, pp, chunks, min_row, num_rows, mask_filter{KERNEL_MASK_STRING}, false)) { - return; - } - - if (!t) { - s->page.num_nulls = 0; - s->page.num_valids = 0; - s->page.str_bytes = 0; - } - __syncthreads(); + auto const mask = BitOr(decode_kernel_mask::STRING, decode_kernel_mask::DELTA_BYTE_ARRAY); + if (!setupLocalPageInfo(s, pp, chunks, min_row, num_rows, mask_filter{mask}, true)) { return; } bool const is_bounds_pg = is_bounds_page(s, min_row, num_rows, has_repetition); - // if we're skipping this page anyway, no need to count it - if (!is_bounds_pg && !is_page_contained(s, min_row, num_rows)) { return; } - // find start/end value indices auto const [start_value, end_value] = page_bounds(s, min_row, num_rows, is_bounds_pg, has_repetition, decoders); @@ -516,7 +612,106 @@ __global__ void __launch_bounds__(preprocess_block_size) gpuComputePageStringSiz if (t == 0) { pp->num_nulls = s->page.num_nulls; pp->num_valids = s->page.num_valids; + pp->start_val = start_value; + pp->end_val = end_value; } +} + +/** + * @brief Kernel for computing string page output size information for delta_byte_array encoding. + * + * This call ignores columns that are not DELTA_BYTE_ARRAY encoded. On exit the `str_bytes` field + * of the `PageInfo` struct will be populated. Also fills in the `temp_string_size` field if rows + * are to be skipped. + * + * @param pages All pages to be decoded + * @param chunks All chunks to be decoded + * @param min_rows crop all rows below min_row + * @param num_rows Maximum number of rows to read + */ +__global__ void __launch_bounds__(delta_preproc_block_size) gpuComputeDeltaPageStringSizes( + PageInfo* pages, device_span chunks, size_t min_row, size_t num_rows) +{ + __shared__ __align__(16) page_state_s state_g; + + page_state_s* const s = &state_g; + int const page_idx = blockIdx.x; + int const t = threadIdx.x; + PageInfo* const pp = &pages[page_idx]; + + // whether or not we have repetition levels (lists) + bool const has_repetition = chunks[pp->chunk_idx].max_level[level_type::REPETITION] > 0; + + // setup page info + auto const mask = decode_kernel_mask::DELTA_BYTE_ARRAY; + if (!setupLocalPageInfo(s, pp, chunks, min_row, num_rows, mask_filter{mask}, true)) { return; } + + auto const start_value = pp->start_val; + + // if data size is known, can short circuit here + if ((chunks[pp->chunk_idx].data_type & 7) == FIXED_LEN_BYTE_ARRAY) { + if (t == 0) { + pp->str_bytes = pp->num_valids * s->dtype_len_in; + + // only need temp space if we're skipping values + if (start_value > 0) { + // just need to parse the header of the first delta binary block to get values_per_mb + delta_binary_decoder db; + db.init_binary_block(s->data_start, s->data_end); + // save enough for one mini-block plus some extra to save the last_string + pp->temp_string_size = s->dtype_len_in * (db.values_per_mb + 1); + } + } + } else { + // now process string info in the range [start_value, end_value) + // set up for decoding strings...can be either plain or dictionary + uint8_t const* data = s->data_start; + uint8_t const* const end = s->data_end; + auto const end_value = pp->end_val; + + auto const [len, temp_bytes] = totalDeltaByteArraySize(data, end, start_value, end_value); + + if (t == 0) { + // TODO check for overflow + pp->str_bytes = len; + + // only need temp space if we're skipping values + if (start_value > 0) { pp->temp_string_size = temp_bytes; } + } + } +} + +/** + * @brief Kernel for computing string page output size information. + * + * This call ignores non-string columns. On exit the `str_bytes` field of the `PageInfo` struct will + * be populated. + * + * @param pages All pages to be decoded + * @param chunks All chunks to be decoded + * @param min_rows crop all rows below min_row + * @param num_rows Maximum number of rows to read + */ +__global__ void __launch_bounds__(preprocess_block_size) gpuComputePageStringSizes( + PageInfo* pages, device_span chunks, size_t min_row, size_t num_rows) +{ + __shared__ __align__(16) page_state_s state_g; + + page_state_s* const s = &state_g; + int const page_idx = blockIdx.x; + int const t = threadIdx.x; + PageInfo* const pp = &pages[page_idx]; + + // whether or not we have repetition levels (lists) + bool const has_repetition = chunks[pp->chunk_idx].max_level[level_type::REPETITION] > 0; + + // setup page info + if (!setupLocalPageInfo( + s, pp, chunks, min_row, num_rows, mask_filter{decode_kernel_mask::STRING}, true)) { + return; + } + + bool const is_bounds_pg = is_bounds_page(s, min_row, num_rows, has_repetition); auto const& col = s->col; size_t str_bytes = 0; @@ -530,6 +725,8 @@ __global__ void __launch_bounds__(preprocess_block_size) gpuComputePageStringSiz uint8_t const* const end = s->data_end; uint8_t const* dict_base = nullptr; int dict_size = 0; + auto const start_value = pp->start_val; + auto const end_value = pp->end_val; switch (pp->encoding) { case Encoding::PLAIN_DICTIONARY: @@ -561,6 +758,9 @@ __global__ void __launch_bounds__(preprocess_block_size) gpuComputePageStringSiz if (t == 0) { // TODO check for overflow pp->str_bytes = str_bytes; + + // only need temp space for delta + pp->temp_string_size = 0; } } @@ -586,6 +786,7 @@ __global__ void __launch_bounds__(decode_block_size) size_t num_rows, int32_t* error_code) { + using cudf::detail::warp_size; __shared__ __align__(16) page_state_s state_g; __shared__ __align__(4) size_type last_offset; __shared__ __align__(16) @@ -596,10 +797,12 @@ __global__ void __launch_bounds__(decode_block_size) auto* const sb = &state_buffers; int const page_idx = blockIdx.x; int const t = threadIdx.x; + int const lane_id = t % warp_size; [[maybe_unused]] null_count_back_copier _{s, t}; + auto const mask = decode_kernel_mask::STRING; if (!setupLocalPageInfo( - s, &pages[page_idx], chunks, min_row, num_rows, mask_filter{KERNEL_MASK_STRING}, true)) { + s, &pages[page_idx], chunks, min_row, num_rows, mask_filter{mask}, true)) { return; } @@ -630,6 +833,7 @@ __global__ void __launch_bounds__(decode_block_size) target_pos = min(s->nz_count, src_pos + decode_block_size - out_thread0); if (out_thread0 > 32) { target_pos = min(target_pos, s->dict_pos); } } + // TODO(ets): see if this sync can be removed __syncthreads(); if (t < 32) { // decode repetition and definition levels. @@ -643,9 +847,9 @@ __global__ void __launch_bounds__(decode_block_size) // WARP1: Decode dictionary indices, booleans or string positions if (s->dict_base) { - src_target_pos = gpuDecodeDictionaryIndices(s, sb, src_target_pos, t & 0x1f).first; + src_target_pos = gpuDecodeDictionaryIndices(s, sb, src_target_pos, lane_id).first; } else { - gpuInitStringDescriptors(s, sb, src_target_pos, t & 0x1f); + gpuInitStringDescriptors(s, sb, src_target_pos, lane_id); } if (t == 32) { *(volatile int32_t*)&s->dict_pos = src_target_pos; } } else { @@ -748,6 +952,19 @@ __global__ void __launch_bounds__(decode_block_size) if (t == 0 and s->error != 0) { set_error(s->error, error_code); } } +// Functor used to set the `temp_string_buf` pointer for each page. `data` points to a buffer +// to be used when skipping rows in the delta_byte_array decoder. Given a page and an offset, +// set the page's `temp_string_buf` to be `data + offset`. +struct page_tform_functor { + uint8_t* const data; + + __device__ PageInfo operator()(PageInfo& page, int64_t offset) + { + if (page.temp_string_size != 0) { page.temp_string_buf = data + offset; } + return page; + } +}; + } // anonymous namespace /** @@ -755,20 +972,81 @@ __global__ void __launch_bounds__(decode_block_size) */ void ComputePageStringSizes(cudf::detail::hostdevice_vector& pages, cudf::detail::hostdevice_vector const& chunks, + rmm::device_uvector& temp_string_buf, size_t min_row, size_t num_rows, int level_type_size, + uint32_t kernel_mask, rmm::cuda_stream_view stream) { - dim3 dim_block(preprocess_block_size, 1); - dim3 dim_grid(pages.size(), 1); // 1 threadblock per page + dim3 const dim_block(preprocess_block_size, 1); + dim3 const dim_grid(pages.size(), 1); // 1 threadblock per page if (level_type_size == 1) { - gpuComputePageStringSizes + gpuComputeStringPageBounds <<>>(pages.device_ptr(), chunks, min_row, num_rows); } else { - gpuComputePageStringSizes + gpuComputeStringPageBounds <<>>(pages.device_ptr(), chunks, min_row, num_rows); } + + // kernel mask may contain other kernels we don't need to count + int const count_mask = + kernel_mask & BitOr(decode_kernel_mask::DELTA_BYTE_ARRAY, decode_kernel_mask::STRING); + int const nkernels = std::bitset<32>(count_mask).count(); + auto const streams = cudf::detail::fork_streams(stream, nkernels); + + int s_idx = 0; + if (BitAnd(kernel_mask, decode_kernel_mask::DELTA_BYTE_ARRAY) != 0) { + dim3 dim_delta(delta_preproc_block_size, 1); + gpuComputeDeltaPageStringSizes<<>>( + pages.device_ptr(), chunks, min_row, num_rows); + } + if (BitAnd(kernel_mask, decode_kernel_mask::STRING) != 0) { + gpuComputePageStringSizes<<>>( + pages.device_ptr(), chunks, min_row, num_rows); + } + + // synchronize the streams + cudf::detail::join_streams(streams, stream); + + // check for needed temp space for DELTA_BYTE_ARRAY + auto const need_sizes = thrust::any_of( + rmm::exec_policy(stream), pages.d_begin(), pages.d_end(), [] __device__(auto& page) { + return page.temp_string_size != 0; + }); + + if (need_sizes) { + // sum up all of the temp_string_sizes + auto const page_sizes = [] __device__(PageInfo const& page) { return page.temp_string_size; }; + auto const total_size = thrust::transform_reduce(rmm::exec_policy(stream), + pages.d_begin(), + pages.d_end(), + page_sizes, + 0L, + thrust::plus{}); + + // now do an exclusive scan over the temp_string_sizes to get offsets for each + // page's chunk of the temp buffer + rmm::device_uvector page_string_offsets(pages.size(), stream); + thrust::transform_exclusive_scan(rmm::exec_policy_nosync(stream), + pages.d_begin(), + pages.d_end(), + page_string_offsets.begin(), + page_sizes, + 0L, + thrust::plus{}); + + // allocate the temp space + temp_string_buf.resize(total_size, stream); + + // now use the offsets array to set each page's temp_string_buf pointers + thrust::transform(rmm::exec_policy_nosync(stream), + pages.d_begin(), + pages.d_end(), + page_string_offsets.begin(), + pages.d_begin(), + page_tform_functor{temp_string_buf.data()}); + } } /** diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 68851e72663..129d4e4d28c 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -35,6 +35,7 @@ #include +#include #include namespace cudf::io::parquet::detail { @@ -64,7 +65,8 @@ constexpr bool is_supported_encoding(Encoding enc) case Encoding::PLAIN_DICTIONARY: case Encoding::RLE: case Encoding::RLE_DICTIONARY: - case Encoding::DELTA_BINARY_PACKED: return true; + case Encoding::DELTA_BINARY_PACKED: + case Encoding::DELTA_BYTE_ARRAY: return true; default: return false; } } @@ -86,13 +88,15 @@ constexpr void set_error(int32_t error, int32_t* error_code) * These values are used as bitmasks, so they must be powers of 2. */ enum class decode_error : int32_t { - DATA_STREAM_OVERRUN = 0x1, - LEVEL_STREAM_OVERRUN = 0x2, - UNSUPPORTED_ENCODING = 0x4, - INVALID_LEVEL_RUN = 0x8, - INVALID_DATA_TYPE = 0x10, - EMPTY_PAGE = 0x20, - INVALID_DICT_WIDTH = 0x40, + DATA_STREAM_OVERRUN = 0x1, + LEVEL_STREAM_OVERRUN = 0x2, + UNSUPPORTED_ENCODING = 0x4, + INVALID_LEVEL_RUN = 0x8, + INVALID_DATA_TYPE = 0x10, + EMPTY_PAGE = 0x20, + INVALID_DICT_WIDTH = 0x40, + DELTA_PARAM_MISMATCH = 0x80, + DELTA_PARAMS_UNSUPPORTED = 0x100, }; /** @@ -145,6 +149,17 @@ constexpr uint32_t BitAnd(T1 a, T2 b) return static_cast(a) & static_cast(b); } +template ::value and std::is_same_v) or + (is_scoped_enum::value and std::is_same_v) or + (is_scoped_enum::value and std::is_same_v)>* = + nullptr> +constexpr uint32_t BitOr(T1 a, T2 b) +{ + return static_cast(a) | static_cast(b); +} + /** * @brief Enums for the flags in the page header */ @@ -168,10 +183,12 @@ enum level_type { * * Used to control which decode kernels to run. */ -enum kernel_mask_bits { - KERNEL_MASK_GENERAL = (1 << 0), // Run catch-all decode kernel - KERNEL_MASK_STRING = (1 << 1), // Run decode kernel for string data - KERNEL_MASK_DELTA_BINARY = (1 << 2) // Run decode kernel for DELTA_BINARY_PACKED data +enum class decode_kernel_mask { + NONE = 0, + GENERAL = (1 << 0), // Run catch-all decode kernel + STRING = (1 << 1), // Run decode kernel for string data + DELTA_BINARY = (1 << 2), // Run decode kernel for DELTA_BINARY_PACKED data + DELTA_BYTE_ARRAY = (1 << 3) // Run decode kernel for DELTA_BYTE_ARRAY encoded data }; /** @@ -252,9 +269,11 @@ struct PageInfo { int32_t num_input_values; int32_t chunk_row; // starting row of this page relative to the start of the chunk int32_t num_rows; // number of rows in this page - // the next two are calculated in gpuComputePageStringSizes + // the next four are calculated in gpuComputePageStringSizes int32_t num_nulls; // number of null values (V2 header), but recalculated for string cols int32_t num_valids; // number of non-null values, taking into account skip_rows/num_rows + int32_t start_val; // index of first value of the string data stream to use + int32_t end_val; // index of last value in string data stream int32_t chunk_idx; // column chunk this page belongs to int32_t src_col_schema; // schema index of this column uint8_t flags; // PAGEINFO_FLAGS_XXX @@ -291,7 +310,11 @@ struct PageInfo { // level decode buffers uint8_t* lvl_decode_buf[level_type::NUM_LEVEL_TYPES]; - uint32_t kernel_mask; + // temporary space for decoding DELTA_BYTE_ARRAY encoded strings + int64_t temp_string_size; + uint8_t* temp_string_buf; + + decode_kernel_mask kernel_mask; }; /** @@ -597,16 +620,20 @@ void ComputePageSizes(cudf::detail::hostdevice_vector& pages, * * @param[in,out] pages All pages to be decoded * @param[in] chunks All chunks to be decoded + * @param[out] temp_string_buf Temporary space needed for decoding DELTA_BYTE_ARRAY strings * @param[in] min_rows crop all rows below min_row * @param[in] num_rows Maximum number of rows to read * @param[in] level_type_size Size in bytes of the type for level decoding + * @param[in] kernel_mask Mask of kernels to run * @param[in] stream CUDA stream to use */ void ComputePageStringSizes(cudf::detail::hostdevice_vector& pages, cudf::detail::hostdevice_vector const& chunks, + rmm::device_uvector& temp_string_buf, size_t min_row, size_t num_rows, int level_type_size, + uint32_t kernel_mask, rmm::cuda_stream_view stream); /** @@ -665,7 +692,7 @@ void DecodeStringPageData(cudf::detail::hostdevice_vector& pages, * @param[in] min_row Minimum number of rows to read * @param[in] level_type_size Size in bytes of the type for level decoding * @param[out] error_code Error code for kernel failures - * @param[in] stream CUDA stream to use, default 0 + * @param[in] stream CUDA stream to use */ void DecodeDeltaBinary(cudf::detail::hostdevice_vector& pages, cudf::detail::hostdevice_vector const& chunks, @@ -675,6 +702,28 @@ void DecodeDeltaBinary(cudf::detail::hostdevice_vector& pages, int32_t* error_code, rmm::cuda_stream_view stream); +/** + * @brief Launches kernel for reading the DELTA_BYTE_ARRAY column data stored in the pages + * + * The page data will be written to the output pointed to in the page's + * associated column chunk. + * + * @param[in,out] pages All pages to be decoded + * @param[in] chunks All chunks to be decoded + * @param[in] num_rows Total number of rows to read + * @param[in] min_row Minimum number of rows to read + * @param[in] level_type_size Size in bytes of the type for level decoding + * @param[out] error_code Error code for kernel failures + * @param[in] stream CUDA stream to use + */ +void DecodeDeltaByteArray(cudf::detail::hostdevice_vector& pages, + cudf::detail::hostdevice_vector const& chunks, + size_t num_rows, + size_t min_row, + int level_type_size, + int32_t* error_code, + rmm::cuda_stream_view stream); + /** * @brief Launches kernel for initializing encoder row group fragments * diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index 11c20d0e540..6e799424d01 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -21,7 +21,6 @@ #include #include #include -#include #include #include @@ -30,10 +29,15 @@ namespace cudf::io::parquet::detail { void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows) { - auto& chunks = _pass_itm_data->chunks; - auto& pages = _pass_itm_data->pages_info; - auto& page_nesting = _pass_itm_data->page_nesting_info; - auto& page_nesting_decode = _pass_itm_data->page_nesting_decode_info; + auto& chunks = _pass_itm_data->chunks; + auto& pages = _pass_itm_data->pages_info; + auto& page_nesting = _pass_itm_data->page_nesting_info; + auto& page_nesting_decode = _pass_itm_data->page_nesting_decode_info; + auto const level_type_size = _pass_itm_data->level_type_size; + + // temporary space for DELTA_BYTE_ARRAY decoding. this only needs to live until + // gpu::DecodeDeltaByteArray returns. + rmm::device_uvector delta_temp_buf(0, _stream); // Should not reach here if there is no page data. CUDF_EXPECTS(pages.size() > 0, "There is no page to decode"); @@ -52,11 +56,12 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows) // doing a gather operation later on. // TODO: This step is somewhat redundant if size info has already been calculated (nested schema, // chunked reader). - auto const has_strings = (kernel_mask & KERNEL_MASK_STRING) != 0; + auto const has_strings = + (kernel_mask & BitOr(decode_kernel_mask::STRING, decode_kernel_mask::DELTA_BYTE_ARRAY)) != 0; std::vector col_sizes(_input_columns.size(), 0L); if (has_strings) { ComputePageStringSizes( - pages, chunks, skip_rows, num_rows, _pass_itm_data->level_type_size, _stream); + pages, chunks, delta_temp_buf, skip_rows, num_rows, level_type_size, kernel_mask, _stream); col_sizes = calculate_page_string_offsets(); @@ -163,6 +168,7 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows) chunks.host_to_device_async(_stream); chunk_nested_valids.host_to_device_async(_stream); chunk_nested_data.host_to_device_async(_stream); + if (has_strings) { chunk_nested_str_data.host_to_device_async(_stream); } // create this before we fork streams kernel_error error_code(_stream); @@ -171,25 +177,27 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows) int const nkernels = std::bitset<32>(kernel_mask).count(); auto streams = cudf::detail::fork_streams(_stream, nkernels); - auto const level_type_size = _pass_itm_data->level_type_size; - // launch string decoder int s_idx = 0; - if (has_strings) { - auto& stream = streams[s_idx++]; - chunk_nested_str_data.host_to_device_async(stream); + if (BitAnd(kernel_mask, decode_kernel_mask::STRING) != 0) { DecodeStringPageData( - pages, chunks, num_rows, skip_rows, level_type_size, error_code.data(), stream); + pages, chunks, num_rows, skip_rows, level_type_size, error_code.data(), streams[s_idx++]); + } + + // launch delta byte array decoder + if (BitAnd(kernel_mask, decode_kernel_mask::DELTA_BYTE_ARRAY) != 0) { + DecodeDeltaByteArray( + pages, chunks, num_rows, skip_rows, level_type_size, error_code.data(), streams[s_idx++]); } // launch delta binary decoder - if ((kernel_mask & KERNEL_MASK_DELTA_BINARY) != 0) { + if (BitAnd(kernel_mask, decode_kernel_mask::DELTA_BINARY) != 0) { DecodeDeltaBinary( pages, chunks, num_rows, skip_rows, level_type_size, error_code.data(), streams[s_idx++]); } // launch the catch-all page decoder - if ((kernel_mask & KERNEL_MASK_GENERAL) != 0) { + if (BitAnd(kernel_mask, decode_kernel_mask::GENERAL) != 0) { DecodePageData( pages, chunks, num_rows, skip_rows, level_type_size, error_code.data(), streams[s_idx++]); } diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index 80a4d00a5a2..0bc492546e9 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -1416,7 +1416,7 @@ std::vector reader::impl::calculate_page_string_offsets() page_index.begin(), page_to_string_size{pages.device_ptr(), chunks.device_ptr()}); // do scan by key to calculate string offsets for each page - thrust::exclusive_scan_by_key(rmm::exec_policy(_stream), + thrust::exclusive_scan_by_key(rmm::exec_policy_nosync(_stream), page_keys.begin(), page_keys.end(), val_iter, @@ -1424,7 +1424,7 @@ std::vector reader::impl::calculate_page_string_offsets() // now sum up page sizes rmm::device_uvector reduce_keys(col_sizes.size(), _stream); - thrust::reduce_by_key(rmm::exec_policy(_stream), + thrust::reduce_by_key(rmm::exec_policy_nosync(_stream), page_keys.begin(), page_keys.end(), val_iter, diff --git a/python/cudf/cudf/tests/data/parquet/delta_byte_arr.parquet b/python/cudf/cudf/tests/data/parquet/delta_byte_arr.parquet new file mode 100644 index 00000000000..7f6006a75bf Binary files /dev/null and b/python/cudf/cudf/tests/data/parquet/delta_byte_arr.parquet differ diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 0c59fd0e5aa..af4d0294293 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -1284,6 +1284,15 @@ def test_parquet_reader_v2(tmpdir, simple_pdf): assert_eq(cudf.read_parquet(pdf_fname), simple_pdf) +def test_parquet_delta_byte_array(datadir): + fname = datadir / "delta_byte_arr.parquet" + assert_eq(cudf.read_parquet(fname), pd.read_parquet(fname)) + + +def delta_num_rows(): + return [1, 2, 23, 32, 33, 34, 64, 65, 66, 128, 129, 130, 20000, 50000] + + @pytest.mark.parametrize("nrows", [1, 100000]) @pytest.mark.parametrize("add_nulls", [True, False]) @pytest.mark.parametrize( @@ -1320,6 +1329,7 @@ def test_delta_binary(nrows, add_nulls, dtype, tmpdir): version="2.6", column_encoding="DELTA_BINARY_PACKED", data_page_version="2.0", + data_page_size=64 * 1024, engine="pyarrow", use_dictionary=False, ) @@ -1350,6 +1360,100 @@ def test_delta_binary(nrows, add_nulls, dtype, tmpdir): assert_eq(cdf2, cdf) +@pytest.mark.parametrize("nrows", delta_num_rows()) +@pytest.mark.parametrize("add_nulls", [True, False]) +@pytest.mark.parametrize("str_encoding", ["DELTA_BYTE_ARRAY"]) +def test_delta_byte_array_roundtrip(nrows, add_nulls, str_encoding, tmpdir): + null_frequency = 0.25 if add_nulls else 0 + + # Create a pandas dataframe with random data of mixed lengths + test_pdf = dg.rand_dataframe( + dtypes_meta=[ + { + "dtype": "str", + "null_frequency": null_frequency, + "cardinality": nrows, + "max_string_length": 10, + }, + { + "dtype": "str", + "null_frequency": null_frequency, + "cardinality": nrows, + "max_string_length": 100, + }, + ], + rows=nrows, + seed=0, + use_threads=False, + ).to_pandas() + + pdf_fname = tmpdir.join("pdfdeltaba.parquet") + test_pdf.to_parquet( + pdf_fname, + version="2.6", + column_encoding=str_encoding, + data_page_version="2.0", + data_page_size=64 * 1024, + engine="pyarrow", + use_dictionary=False, + ) + cdf = cudf.read_parquet(pdf_fname) + pcdf = cudf.from_pandas(test_pdf) + assert_eq(cdf, pcdf) + + +@pytest.mark.parametrize("nrows", delta_num_rows()) +@pytest.mark.parametrize("add_nulls", [True, False]) +@pytest.mark.parametrize("str_encoding", ["DELTA_BYTE_ARRAY"]) +def test_delta_struct_list(tmpdir, nrows, add_nulls, str_encoding): + # Struct> + lists_per_row = 3 + list_size = 4 + num_rows = nrows + include_validity = add_nulls + + def list_gen_wrapped(x, y): + return list_row_gen( + int_gen, x * list_size * lists_per_row, list_size, lists_per_row + ) + + def string_list_gen_wrapped(x, y): + return list_row_gen( + string_gen, + x * list_size * lists_per_row, + list_size, + lists_per_row, + include_validity, + ) + + data = struct_gen( + [int_gen, string_gen, list_gen_wrapped, string_list_gen_wrapped], + 0, + num_rows, + include_validity, + ) + test_pdf = pa.Table.from_pydict({"sol": data}).to_pandas() + pdf_fname = tmpdir.join("pdfdeltaba.parquet") + test_pdf.to_parquet( + pdf_fname, + version="2.6", + column_encoding={ + "sol.col0": "DELTA_BINARY_PACKED", + "sol.col1": str_encoding, + "sol.col2.list.element.list.element": "DELTA_BINARY_PACKED", + "sol.col3.list.element.list.element": str_encoding, + }, + data_page_version="2.0", + data_page_size=64 * 1024, + engine="pyarrow", + use_dictionary=False, + ) + # sanity check to verify file is written properly + assert_eq(test_pdf, pd.read_parquet(pdf_fname)) + cdf = cudf.read_parquet(pdf_fname) + assert_eq(cdf, cudf.from_pandas(test_pdf)) + + @pytest.mark.parametrize( "data", [