diff --git a/parquet/src/encodings/rle.rs b/parquet/src/encodings/rle.rs index 711f3a61557d..64a112101f3c 100644 --- a/parquet/src/encodings/rle.rs +++ b/parquet/src/encodings/rle.rs @@ -419,6 +419,11 @@ impl RleDecoder { &mut buffer[values_read..values_read + num_values], self.bit_width as usize, ); + if num_values == 0 { + // Handle writers which truncate the final block + self.bit_packed_left = 0; + continue; + } self.bit_packed_left -= num_values as u32; values_read += num_values; } else if !self.reload() { @@ -467,6 +472,11 @@ impl RleDecoder { &mut index_buf[..num_values], self.bit_width as usize, ); + if num_values == 0 { + // Handle writers which truncate the final block + self.bit_packed_left = 0; + break; + } for i in 0..num_values { buffer[values_read + i].clone_from(&dict[index_buf[i] as usize]) } @@ -743,6 +753,42 @@ mod tests { } } + #[test] + fn test_truncated_rle() { + // The final bit packed run within a page may not be a multiple of 8 values + // Unfortunately the specification stores `(bit-packed-run-len) / 8` + // This means we don't necessarily know how many values are present + // and some writers may not add padding to compensate for this ambiguity + + // Bit pack encode 20 values with a bit width of 8 + let mut data: Vec = vec![ + (3 << 1) | 1, // bit-packed run of 3 * 8 + ]; + data.extend(std::iter::repeat(0xFF).take(20)); + let data = ByteBufferPtr::new(data); + + let mut decoder = RleDecoder::new(8); + decoder.set_data(data.clone()); + + let mut output = vec![0_u16; 100]; + let read = decoder.get_batch(&mut output).unwrap(); + + assert_eq!(read, 20); + assert!(output.iter().take(20).all(|x| *x == 255)); + + // Reset decoder + decoder.set_data(data); + + let dict: Vec = (0..256).collect(); + let mut output = vec![0_u16; 100]; + let read = decoder + .get_batch_with_dict(&dict, &mut output, 100) + .unwrap(); + + assert_eq!(read, 20); + assert!(output.iter().take(20).all(|x| *x == 255)); + } + #[test] fn test_rle_specific_roundtrip() { let bit_width = 1;