Skip to content

Commit

Permalink
Fix reading of RLE encoded boolean data from parquet files with V2 pa…
Browse files Browse the repository at this point in the history
…ge headers (#13707)

The current parquet reader assumes that repetition or definition level data with a bit length of 0 will have no data encoded in the header.  In the case of V2 headers, this assumption is false. This PR checks the V2 page header data to see if level data needs to be accounted for. Also fixes an error that was present in the RLE data decoder where the encoded length of the RLE data was not skipped properly.

Fixes #13655

Authors:
  - Ed Seidl (https://github.com/etseidl)
  - Mike Wilson (https://github.com/hyperbolic2346)
  - GALI PREM SAGAR (https://github.com/galipremsagar)

Approvers:
  - Vukasin Milovanovic (https://github.com/vuule)
  - https://github.com/nvdbaranec
  - GALI PREM SAGAR (https://github.com/galipremsagar)
  - Yunsong Wang (https://github.com/PointKernel)

URL: #13707
  • Loading branch information
etseidl authored Jul 26, 2023
1 parent f8e5a89 commit 55894bf
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 31 deletions.
82 changes: 54 additions & 28 deletions cpp/src/io/parquet/page_decode.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -871,45 +871,65 @@ inline __device__ uint32_t InitLevelSection(page_state_s* s,
level_type lvl)
{
int32_t len;
int level_bits = s->col.level_bits[lvl];
Encoding encoding = lvl == level_type::DEFINITION ? s->page.definition_level_encoding
: s->page.repetition_level_encoding;
int const level_bits = s->col.level_bits[lvl];
auto const encoding = lvl == level_type::DEFINITION ? s->page.definition_level_encoding
: s->page.repetition_level_encoding;

auto start = cur;
if (level_bits == 0) {
len = 0;
s->initial_rle_run[lvl] = s->page.num_input_values * 2; // repeated value
s->initial_rle_value[lvl] = 0;
s->lvl_start[lvl] = cur;
s->abs_lvl_start[lvl] = cur;
} else if (encoding == Encoding::RLE) {
// V2 only uses RLE encoding, so only perform check here
if (s->page.def_lvl_bytes || s->page.rep_lvl_bytes) {
len = lvl == level_type::DEFINITION ? s->page.def_lvl_bytes : s->page.rep_lvl_bytes;
} else if (cur + 4 < end) {
len = 4 + (cur[0]) + (cur[1] << 8) + (cur[2] << 16) + (cur[3] << 24);
cur += 4;
} else {
len = 0;
s->error = 2;
}
s->abs_lvl_start[lvl] = cur;
if (!s->error) {
uint32_t run = get_vlq32(cur, end);
s->initial_rle_run[lvl] = run;
if (!(run & 1)) {
int v = (cur < end) ? cur[0] : 0;

auto init_rle = [s, lvl, end, level_bits](uint8_t const* cur, uint8_t const* end) {
uint32_t const run = get_vlq32(cur, end);
s->initial_rle_run[lvl] = run;
if (!(run & 1)) {
if (cur < end) {
int v = cur[0];
cur++;
if (level_bits > 8) {
v |= ((cur < end) ? cur[0] : 0) << 8;
cur++;
}
s->initial_rle_value[lvl] = v;
} else {
s->initial_rle_value[lvl] = 0;
}
s->lvl_start[lvl] = cur;
}
s->lvl_start[lvl] = cur;

if (cur > end) { s->error = 2; }
};

// this is a little redundant. if level_bits == 0, then nothing should be encoded
// for the level, but some V2 files in the wild violate this and encode the data anyway.
// thus we will handle V2 headers separately.
if ((s->page.flags & PAGEINFO_FLAGS_V2) != 0) {
// V2 only uses RLE encoding so no need to check encoding
len = lvl == level_type::DEFINITION ? s->page.def_lvl_bytes : s->page.rep_lvl_bytes;
s->abs_lvl_start[lvl] = cur;
if (len == 0) {
s->initial_rle_run[lvl] = s->page.num_input_values * 2; // repeated value
s->initial_rle_value[lvl] = 0;
s->lvl_start[lvl] = cur;
} else {
init_rle(cur, cur + len);
}
} else if (level_bits == 0) {
len = 0;
s->initial_rle_run[lvl] = s->page.num_input_values * 2; // repeated value
s->initial_rle_value[lvl] = 0;
s->lvl_start[lvl] = cur;
s->abs_lvl_start[lvl] = cur;
} else if (encoding == Encoding::RLE) { // V1 header with RLE encoding
if (cur + 4 < end) {
len = (cur[0]) + (cur[1] << 8) + (cur[2] << 16) + (cur[3] << 24);
cur += 4;
s->abs_lvl_start[lvl] = cur;
init_rle(cur, cur + len);
// add back the 4 bytes for the length
len += 4;
} else {
len = 0;
s->error = 2;
}
} else if (encoding == Encoding::BIT_PACKED) {
len = (s->page.num_input_values * level_bits + 7) >> 3;
s->initial_rle_run[lvl] = ((s->page.num_input_values + 7) >> 3) * 2 + 1; // literal run
Expand Down Expand Up @@ -1247,7 +1267,13 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
s->dict_val = 0;
if ((s->col.data_type & 7) == BOOLEAN) { s->dict_run = s->dict_size * 2 + 1; }
break;
case Encoding::RLE: s->dict_run = 0; break;
case Encoding::RLE: {
// first 4 bytes are length of RLE data
int const len = (cur[0]) + (cur[1] << 8) + (cur[2] << 16) + (cur[3] << 24);
cur += 4;
if (cur + len > end) { s->error = 2; }
s->dict_run = 0;
} break;
default:
s->error = 1; // Unsupported encoding
break;
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/io/parquet/page_hdr.cu
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ __global__ void __launch_bounds__(128)
// definition levels
bs->page.chunk_row += bs->page.num_rows;
bs->page.num_rows = 0;
bs->page.flags = 0;
// zero out V2 info
bs->page.num_nulls = 0;
bs->page.def_lvl_bytes = 0;
Expand All @@ -395,7 +396,6 @@ __global__ void __launch_bounds__(128)
case PageType::DATA_PAGE:
index_out = num_dict_pages + data_page_count;
data_page_count++;
bs->page.flags = 0;
// this computation is only valid for flat schemas. for nested schemas,
// they will be recomputed in the preprocess step by examining repetition and
// definition levels
Expand All @@ -405,7 +405,7 @@ __global__ void __launch_bounds__(128)
case PageType::DATA_PAGE_V2:
index_out = num_dict_pages + data_page_count;
data_page_count++;
bs->page.flags = 0;
bs->page.flags |= PAGEINFO_FLAGS_V2;
values_found += bs->page.num_input_values;
// V2 only uses RLE, so it was removed from the header
bs->page.definition_level_encoding = Encoding::RLE;
Expand All @@ -414,7 +414,7 @@ __global__ void __launch_bounds__(128)
case PageType::DICTIONARY_PAGE:
index_out = dictionary_page_count;
dictionary_page_count++;
bs->page.flags = PAGEINFO_FLAGS_DICTIONARY;
bs->page.flags |= PAGEINFO_FLAGS_DICTIONARY;
break;
default: index_out = -1; break;
}
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ namespace gpu {
*/
enum {
PAGEINFO_FLAGS_DICTIONARY = (1 << 0), // Indicates a dictionary page
PAGEINFO_FLAGS_V2 = (1 << 1), // V2 page header
};

/**
Expand Down
Binary file not shown.
9 changes: 9 additions & 0 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2538,6 +2538,15 @@ def test_parquet_reader_binary_decimal(datadir):
assert_eq(expect, got)


def test_parquet_reader_rle_boolean(datadir):
fname = datadir / "rle_boolean_encoding.parquet"

expect = pd.read_parquet(fname)
got = cudf.read_parquet(fname)

assert_eq(expect, got)


# testing a specific bug-fix/edge case.
# specifically: int a parquet file containing a particular way of representing
# a list column in a schema, the cudf reader was confusing
Expand Down

0 comments on commit 55894bf

Please sign in to comment.