diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs index ae001ed73391..5fef770d1514 100644 --- a/parquet/src/arrow/array_reader.rs +++ b/parquet/src/arrow/array_reader.rs @@ -200,7 +200,6 @@ pub struct PrimitiveArrayReader { rep_levels_buffer: Option, column_desc: ColumnDescPtr, record_reader: RecordReader, - _type_marker: PhantomData, } impl PrimitiveArrayReader { @@ -230,7 +229,6 @@ impl PrimitiveArrayReader { rep_levels_buffer: None, column_desc, record_reader, - _type_marker: PhantomData, }) } } diff --git a/parquet/src/arrow/record_reader.rs b/parquet/src/arrow/record_reader.rs index 82c7ee98a9f3..0123a8d09dfb 100644 --- a/parquet/src/arrow/record_reader.rs +++ b/parquet/src/arrow/record_reader.rs @@ -16,27 +16,201 @@ // under the License. use std::cmp::{max, min}; -use std::mem::{replace, size_of}; - -use crate::column::{page::PageReader, reader::ColumnReaderImpl}; +use std::marker::PhantomData; +use std::mem::replace; +use std::ops::Range; + +use crate::arrow::record_reader::private::{ + DefinitionLevels, RecordBuffer, RepetitionLevels, +}; +use crate::column::{ + page::PageReader, + reader::{ + private::{ + ColumnLevelDecoder, ColumnLevelDecoderImpl, ColumnValueDecoder, + ColumnValueDecoderImpl, + }, + GenericColumnReader, + }, +}; use crate::data_type::DataType; -use crate::errors::{ParquetError, Result}; +use crate::errors::Result; use crate::schema::types::ColumnDescPtr; use arrow::array::BooleanBufferBuilder; use arrow::bitmap::Bitmap; use arrow::buffer::{Buffer, MutableBuffer}; +pub(crate) mod private { + use super::*; + + pub trait RecordBuffer: Sized + Default { + type Output: Sized; + + type Writer: ?Sized; + + /// Split out `len` items + fn split(&mut self, len: usize) -> Self::Output; + + /// Get a writer with `batch_size` capacity + fn writer(&mut self, batch_size: usize) -> &mut Self::Writer; + + /// Record a write of `len` items + fn commit(&mut self, len: usize); + } + + pub trait RepetitionLevels: RecordBuffer { + /// Inspects the buffered repetition levels in `range` and returns the number of + /// "complete" records along with the corresponding number of values + /// + /// A "complete" record is one where the buffer contains a subsequent repetition level of 0 + fn count_records( + &self, + range: Range, + max_records: usize, + ) -> (usize, usize); + } + + pub trait DefinitionLevels: RecordBuffer { + /// Update the provided validity mask based on contained levels + fn update_valid_mask( + &self, + valid: &mut BooleanBufferBuilder, + range: Range, + max_level: i16, + ); + } + + pub struct TypedBuffer { + buffer: MutableBuffer, + + /// Length in elements of size T + len: usize, + + /// Placeholder to allow `T` as an invariant generic parameter + _phantom: PhantomData<*mut T>, + } + + impl Default for TypedBuffer { + fn default() -> Self { + Self { + buffer: MutableBuffer::new(0), + len: 0, + _phantom: Default::default(), + } + } + } + + impl RecordBuffer for TypedBuffer { + type Output = Buffer; + + type Writer = [T]; + + fn split(&mut self, len: usize) -> Self::Output { + let num_bytes = len * std::mem::size_of::(); + let remaining_bytes = self.buffer.len() - num_bytes; + // TODO: Optimize to reduce the copy + // create an empty buffer, as it will be resized below + let mut remaining = MutableBuffer::new(0); + remaining.resize(remaining_bytes, 0); + + let new_records = remaining.as_slice_mut(); + + new_records[0..remaining_bytes] + .copy_from_slice(&self.buffer.as_slice()[num_bytes..]); + + self.buffer.resize(num_bytes, 0); + + replace(&mut self.buffer, remaining).into() + } + + fn writer(&mut self, batch_size: usize) -> &mut Self::Writer { + self.buffer + .resize((self.len + batch_size) * std::mem::size_of::(), 0); + + let (prefix, values, suffix) = + unsafe { self.buffer.as_slice_mut().align_to_mut::() }; + assert!(prefix.is_empty() && suffix.is_empty()); + + &mut values[self.len..self.len + batch_size] + } + + fn commit(&mut self, len: usize) { + self.len = len; + + let new_bytes = self.len * std::mem::size_of::(); + assert!(new_bytes <= self.buffer.len()); + self.buffer.resize(new_bytes, 0); + } + } + + impl RepetitionLevels for TypedBuffer { + fn count_records( + &self, + range: Range, + max_records: usize, + ) -> (usize, usize) { + let (prefix, buf, suffix) = + unsafe { self.buffer.as_slice().align_to::() }; + assert!(prefix.is_empty() && suffix.is_empty()); + + let start = range.start; + let mut records_read = 0; + let mut end_of_last_record = start; + + for current in range { + if buf[current] == 0 && current != end_of_last_record { + records_read += 1; + end_of_last_record = current; + + if records_read == max_records { + break; + } + } + } + + (records_read, end_of_last_record - start) + } + } + + impl DefinitionLevels for TypedBuffer { + fn update_valid_mask( + &self, + null_mask: &mut BooleanBufferBuilder, + range: Range, + max_level: i16, + ) { + let (prefix, buf, suffix) = + unsafe { self.buffer.as_slice().align_to::() }; + assert!(prefix.is_empty() && suffix.is_empty()); + + for i in &buf[range] { + null_mask.append(*i == max_level) + } + } + } +} + const MIN_BATCH_SIZE: usize = 1024; /// A `RecordReader` is a stateful column reader that delimits semantic records. -pub struct RecordReader { +pub type RecordReader = GenericRecordReader< + private::TypedBuffer, + private::TypedBuffer, + private::TypedBuffer<::T>, + ColumnLevelDecoderImpl, + ColumnLevelDecoderImpl, + ColumnValueDecoderImpl, +>; + +#[doc(hidden)] +pub struct GenericRecordReader { column_desc: ColumnDescPtr, - records: MutableBuffer, - def_levels: Option, - rep_levels: Option, + records: V, + def_levels: Option, + rep_levels: Option, null_bitmap: Option, - column_reader: Option>, + column_reader: Option>, /// Number of records accumulated in records num_records: usize, @@ -47,25 +221,26 @@ pub struct RecordReader { values_written: usize, } -impl RecordReader { +impl GenericRecordReader +where + R: RepetitionLevels, + D: DefinitionLevels, + V: RecordBuffer, + CR: ColumnLevelDecoder, + CD: ColumnLevelDecoder, + CV: ColumnValueDecoder, +{ pub fn new(column_schema: ColumnDescPtr) -> Self { let (def_levels, null_map) = if column_schema.max_def_level() > 0 { - ( - Some(MutableBuffer::new(MIN_BATCH_SIZE)), - Some(BooleanBufferBuilder::new(0)), - ) + (Some(Default::default()), Some(BooleanBufferBuilder::new(0))) } else { (None, None) }; - let rep_levels = if column_schema.max_rep_level() > 0 { - Some(MutableBuffer::new(MIN_BATCH_SIZE)) - } else { - None - }; + let rep_levels = (column_schema.max_rep_level() > 0).then(Default::default); Self { - records: MutableBuffer::new(MIN_BATCH_SIZE), + records: Default::default(), def_levels, rep_levels, null_bitmap: null_map, @@ -79,8 +254,10 @@ impl RecordReader { /// Set the current page reader. pub fn set_page_reader(&mut self, page_reader: Box) -> Result<()> { - self.column_reader = - Some(ColumnReaderImpl::new(self.column_desc.clone(), page_reader)); + self.column_reader = Some(GenericColumnReader::new_null_padding( + self.column_desc.clone(), + page_reader, + )); Ok(()) } @@ -153,78 +330,26 @@ impl RecordReader { /// The implementation has side effects. It will create a new buffer to hold those /// definition level values that have already been read into memory but not counted /// as record values, e.g. those from `self.num_values` to `self.values_written`. - pub fn consume_def_levels(&mut self) -> Result> { - let new_buffer = if let Some(ref mut def_levels_buf) = &mut self.def_levels { - let num_left_values = self.values_written - self.num_values; - // create an empty buffer, as it will be resized below - let mut new_buffer = MutableBuffer::new(0); - let num_bytes = num_left_values * size_of::(); - let new_len = self.num_values * size_of::(); - - new_buffer.resize(num_bytes, 0); - - let new_def_levels = new_buffer.as_slice_mut(); - let left_def_levels = &def_levels_buf.as_slice_mut()[new_len..]; - - new_def_levels[0..num_bytes].copy_from_slice(&left_def_levels[0..num_bytes]); - - def_levels_buf.resize(new_len, 0); - Some(new_buffer) - } else { - None - }; - - Ok(replace(&mut self.def_levels, new_buffer).map(|x| x.into())) + pub fn consume_def_levels(&mut self) -> Result> { + Ok(match self.def_levels.as_mut() { + Some(x) => Some(x.split(self.num_values)), + None => None, + }) } /// Return repetition level data. /// The side effect is similar to `consume_def_levels`. - pub fn consume_rep_levels(&mut self) -> Result> { - // TODO: Optimize to reduce the copy - let new_buffer = if let Some(ref mut rep_levels_buf) = &mut self.rep_levels { - let num_left_values = self.values_written - self.num_values; - // create an empty buffer, as it will be resized below - let mut new_buffer = MutableBuffer::new(0); - let num_bytes = num_left_values * size_of::(); - let new_len = self.num_values * size_of::(); - - new_buffer.resize(num_bytes, 0); - - let new_rep_levels = new_buffer.as_slice_mut(); - let left_rep_levels = &rep_levels_buf.as_slice_mut()[new_len..]; - - new_rep_levels[0..num_bytes].copy_from_slice(&left_rep_levels[0..num_bytes]); - - rep_levels_buf.resize(new_len, 0); - - Some(new_buffer) - } else { - None - }; - - Ok(replace(&mut self.rep_levels, new_buffer).map(|x| x.into())) + pub fn consume_rep_levels(&mut self) -> Result> { + Ok(match self.rep_levels.as_mut() { + Some(x) => Some(x.split(self.num_values)), + None => None, + }) } /// Returns currently stored buffer data. /// The side effect is similar to `consume_def_levels`. - pub fn consume_record_data(&mut self) -> Result { - // TODO: Optimize to reduce the copy - let num_left_values = self.values_written - self.num_values; - // create an empty buffer, as it will be resized below - let mut new_buffer = MutableBuffer::new(0); - let num_bytes = num_left_values * T::get_type_size(); - let new_len = self.num_values * T::get_type_size(); - - new_buffer.resize(num_bytes, 0); - - let new_records = new_buffer.as_slice_mut(); - let left_records = &mut self.records.as_slice_mut()[new_len..]; - - new_records[0..num_bytes].copy_from_slice(&left_records[0..num_bytes]); - - self.records.resize(new_len, 0); - - Ok(replace(&mut self.records, new_buffer).into()) + pub fn consume_record_data(&mut self) -> Result { + Ok(self.records.split(self.num_values)) } /// Returns currently stored null bitmap data. @@ -275,37 +400,19 @@ impl RecordReader { /// Try to read one batch of data. fn read_one_batch(&mut self, batch_size: usize) -> Result { - // Reserve spaces - self.records - .resize(self.records.len() + batch_size * T::get_type_size(), 0); - if let Some(ref mut buf) = self.rep_levels { - buf.resize(buf.len() + batch_size * size_of::(), 0); - } - if let Some(ref mut buf) = self.def_levels { - buf.resize(buf.len() + batch_size * size_of::(), 0); - } - let values_written = self.values_written; - // Convert mutable buffer spaces to mutable slices - let (prefix, values, suffix) = - unsafe { self.records.as_slice_mut().align_to_mut::() }; - assert!(prefix.is_empty() && suffix.is_empty()); - let values = &mut values[values_written..]; + let rep_levels = self + .rep_levels + .as_mut() + .map(|levels| levels.writer(batch_size)); - let def_levels = self.def_levels.as_mut().map(|buf| { - let (prefix, def_levels, suffix) = - unsafe { buf.as_slice_mut().align_to_mut::() }; - assert!(prefix.is_empty() && suffix.is_empty()); - &mut def_levels[values_written..] - }); + let def_levels = self + .def_levels + .as_mut() + .map(|levels| levels.writer(batch_size)); - let rep_levels = self.rep_levels.as_mut().map(|buf| { - let (prefix, rep_levels, suffix) = - unsafe { buf.as_slice_mut().align_to_mut::() }; - assert!(prefix.is_empty() && suffix.is_empty()); - &mut rep_levels[values_written..] - }); + let values = self.records.writer(batch_size); let (values_read, levels_read) = self .column_reader @@ -313,54 +420,20 @@ impl RecordReader { .unwrap() .read_batch(batch_size, def_levels, rep_levels, values)?; - // get new references for the def levels. - let def_levels = self.def_levels.as_ref().map(|buf| { - let (prefix, def_levels, suffix) = - unsafe { buf.as_slice().align_to::() }; - assert!(prefix.is_empty() && suffix.is_empty()); - &def_levels[values_written..] - }); - - let max_def_level = self.column_desc.max_def_level(); - - if values_read < levels_read { - let def_levels = def_levels.ok_or_else(|| { - general_err!( - "Definition levels should exist when data is less than levels!" - ) - })?; - - // Fill spaces in column data with default values - let mut values_pos = values_read; - let mut level_pos = levels_read; - - while level_pos > values_pos { - if def_levels[level_pos - 1] == max_def_level { - // This values is not empty - // We use swap rather than assign here because T::T doesn't - // implement Copy - values.swap(level_pos - 1, values_pos - 1); - values_pos -= 1; - } else { - values[level_pos - 1] = T::T::default(); - } + if let Some(null_bitmap) = self.null_bitmap.as_mut() { + let def_levels = self + .def_levels + .as_mut() + .expect("definition levels should exist"); - level_pos -= 1; - } - } - - // Fill in bitmap data - if let Some(null_buffer) = self.null_bitmap.as_mut() { - let def_levels = def_levels.ok_or_else(|| { - general_err!( - "Definition levels should exist when data is less than levels!" - ) - })?; - (0..levels_read) - .for_each(|idx| null_buffer.append(def_levels[idx] == max_def_level)); + def_levels.update_valid_mask( + null_bitmap, + values_written..values_written + levels_read, + self.column_desc.max_def_level(), + ) } - let values_read = max(values_read, levels_read); + let values_read = max(levels_read, values_read); self.set_values_written(self.values_written + values_read)?; Ok(values_read) } @@ -370,32 +443,9 @@ impl RecordReader { /// /// A "complete" record is one where the buffer contains a subsequent repetition level of 0 fn count_records(&self, records_to_read: usize) -> (usize, usize) { - let rep_levels = self.rep_levels.as_ref().map(|buf| { - let (prefix, rep_levels, suffix) = - unsafe { buf.as_slice().align_to::() }; - assert!(prefix.is_empty() && suffix.is_empty()); - rep_levels - }); - - match rep_levels { + match self.rep_levels.as_ref() { Some(buf) => { - let mut records_read = 0; - let mut end_of_last_record = self.num_values; - - for current in self.num_values..self.values_written { - if buf[current] == 0 { - if current != end_of_last_record { - records_read += 1; - end_of_last_record = current; - - if records_read == records_to_read { - break; - } - } - } - } - - (records_read, end_of_last_record - self.num_values) + buf.count_records(self.num_values..self.values_written, records_to_read) } None => { let records_read = @@ -409,17 +459,14 @@ impl RecordReader { #[allow(clippy::unnecessary_wraps)] fn set_values_written(&mut self, new_values_written: usize) -> Result<()> { self.values_written = new_values_written; - self.records - .resize(self.values_written * T::get_type_size(), 0); - - let new_levels_len = self.values_written * size_of::(); + self.records.commit(self.values_written); if let Some(ref mut buf) = self.rep_levels { - buf.resize(new_levels_len, 0) + buf.commit(self.values_written) }; if let Some(ref mut buf) = self.def_levels { - buf.resize(new_levels_len, 0) + buf.commit(self.values_written) }; Ok(()) diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs index 63be17b7dd1f..ee1e20a69282 100644 --- a/parquet/src/column/reader.rs +++ b/parquet/src/column/reader.rs @@ -20,17 +20,20 @@ use std::{ cmp::{max, min}, collections::HashMap, + ops::Range, }; use super::page::{Page, PageReader}; use crate::basic::*; -use crate::data_type::*; -use crate::encodings::{ - decoding::{get_decoder, Decoder, DictDecoder, PlainDecoder}, - levels::LevelDecoder, +use crate::column::reader::private::{ + ColumnLevelDecoder, ColumnValueDecoder, LevelsWriter, ValuesWriter, }; +use crate::data_type::*; +use crate::encodings::decoding::{get_decoder, Decoder, DictDecoder, PlainDecoder}; +use crate::encodings::rle::RleDecoder; use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; +use crate::util::bit_util::{ceil, BitReader}; use crate::util::memory::ByteBufferPtr; /// Column reader for a Parquet type. @@ -101,37 +104,319 @@ pub fn get_typed_column_reader( }) } +pub(crate) mod private { + use super::*; + + /// A type that can have level data written to it by a [`ColumnLevelDecoder`] + pub trait LevelsWriter { + fn capacity(&self) -> usize; + + fn get(&self, idx: usize) -> i16; + } + + impl LevelsWriter for [i16] { + fn capacity(&self) -> usize { + self.len() + } + + fn get(&self, idx: usize) -> i16 { + self[idx] + } + } + + /// A type that can have value data written to it by a [`ColumnValueDecoder`] + pub trait ValuesWriter { + fn capacity(&self) -> usize; + } + + impl ValuesWriter for [T] { + fn capacity(&self) -> usize { + self.len() + } + } + + /// Decodes level data to a [`LevelsWriter`] + pub trait ColumnLevelDecoder { + type Writer: LevelsWriter + ?Sized; + + fn create(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self; + + fn read(&mut self, out: &mut Self::Writer, range: Range) -> Result; + } + + /// Decodes value data to a [`ValuesWriter`] + pub trait ColumnValueDecoder { + type Writer: ValuesWriter + ?Sized; + + fn create(col: &ColumnDescPtr, pad_nulls: bool) -> Self; + + fn set_dict( + &mut self, + buf: ByteBufferPtr, + num_values: u32, + encoding: Encoding, + is_sorted: bool, + ) -> Result<()>; + + fn set_data( + &mut self, + encoding: Encoding, + data: ByteBufferPtr, + num_values: usize, + ) -> Result<()>; + + fn read( + &mut self, + out: &mut Self::Writer, + levels: Range, + values_read: usize, + is_valid: impl Fn(usize) -> bool, + ) -> Result; + } + + /// An implementation of [`ColumnValueDecoder`] for `[T::T]` + pub struct ColumnValueDecoderImpl { + descr: ColumnDescPtr, + + pad_nulls: bool, + + current_encoding: Option, + + // Cache of decoders for existing encodings + decoders: HashMap>>, + } + + impl ColumnValueDecoder for ColumnValueDecoderImpl { + type Writer = [T::T]; + + fn create(descr: &ColumnDescPtr, pad_nulls: bool) -> Self { + Self { + descr: descr.clone(), + pad_nulls, + current_encoding: None, + decoders: Default::default(), + } + } + + fn set_dict( + &mut self, + buf: ByteBufferPtr, + num_values: u32, + mut encoding: Encoding, + _is_sorted: bool, + ) -> Result<()> { + if encoding == Encoding::PLAIN || encoding == Encoding::PLAIN_DICTIONARY { + encoding = Encoding::RLE_DICTIONARY + } + + if self.decoders.contains_key(&encoding) { + return Err(general_err!("Column cannot have more than one dictionary")); + } + + if encoding == Encoding::RLE_DICTIONARY { + let mut dictionary = PlainDecoder::::new(self.descr.type_length()); + dictionary.set_data(buf, num_values as usize)?; + + let mut decoder = DictDecoder::new(); + decoder.set_dict(Box::new(dictionary))?; + self.decoders.insert(encoding, Box::new(decoder)); + Ok(()) + } else { + Err(nyi_err!( + "Invalid/Unsupported encoding type for dictionary: {}", + encoding + )) + } + } + + fn set_data( + &mut self, + mut encoding: Encoding, + data: ByteBufferPtr, + num_values: usize, + ) -> Result<()> { + if encoding == Encoding::PLAIN_DICTIONARY { + encoding = Encoding::RLE_DICTIONARY; + } + + let decoder = if encoding == Encoding::RLE_DICTIONARY { + self.decoders + .get_mut(&encoding) + .expect("Decoder for dict should have been set") + } else { + // Search cache for data page decoder + #[allow(clippy::map_entry)] + if !self.decoders.contains_key(&encoding) { + // Initialize decoder for this page + let data_decoder = get_decoder::(self.descr.clone(), encoding)?; + self.decoders.insert(encoding, data_decoder); + } + self.decoders.get_mut(&encoding).unwrap() + }; + + decoder.set_data(data, num_values)?; + self.current_encoding = Some(encoding); + Ok(()) + } + + fn read( + &mut self, + out: &mut Self::Writer, + levels: Range, + values_read: usize, + is_valid: impl Fn(usize) -> bool, + ) -> Result { + let encoding = self + .current_encoding + .expect("current_encoding should be set"); + + let current_decoder = self.decoders.get_mut(&encoding).unwrap_or_else(|| { + panic!("decoder for encoding {} should be set", encoding) + }); + + let values_to_read = levels.clone().filter(|x| is_valid(*x)).count(); + + match self.pad_nulls { + true => { + // Read into start of buffer + let values_read = current_decoder + .get(&mut out[levels.start..levels.start + values_to_read])?; + + if values_read != values_to_read { + return Err(general_err!("insufficient values in page")); + } + + // Shuffle nulls + let mut values_pos = levels.start + values_to_read; + let mut level_pos = levels.end; + + while level_pos > values_pos { + if is_valid(level_pos - 1) { + // This values is not empty + // We use swap rather than assign here because T::T doesn't + // implement Copy + out.swap(level_pos - 1, values_pos - 1); + values_pos -= 1; + } else { + out[level_pos - 1] = T::T::default(); + } + + level_pos -= 1; + } + + Ok(values_read) + } + false => current_decoder + .get(&mut out[values_read..values_read + values_to_read]), + } + } + } + + /// An implementation of [`ColumnLevelDecoder`] for `[i16]` + pub struct ColumnLevelDecoderImpl { + inner: LevelDecoderInner, + } + + enum LevelDecoderInner { + Packed(BitReader, u8), + /// Boxed as `RleDecoder` contains an inline buffer + Rle(Box), + } + + impl ColumnLevelDecoder for ColumnLevelDecoderImpl { + type Writer = [i16]; + + fn create(max_level: i16, encoding: Encoding, data: ByteBufferPtr) -> Self { + let bit_width = crate::util::bit_util::log2(max_level as u64 + 1) as u8; + match encoding { + Encoding::RLE => { + let mut decoder = Box::new(RleDecoder::new(bit_width)); + decoder.set_data(data); + Self { + inner: LevelDecoderInner::Rle(decoder), + } + } + Encoding::BIT_PACKED => Self { + inner: LevelDecoderInner::Packed(BitReader::new(data), bit_width), + }, + _ => unreachable!("invalid level encoding: {}", encoding), + } + } + + fn read(&mut self, out: &mut Self::Writer, range: Range) -> Result { + match &mut self.inner { + LevelDecoderInner::Packed(reader, bit_width) => { + Ok(reader.get_batch::(&mut out[range], *bit_width as usize)) + } + LevelDecoderInner::Rle(reader) => reader.get_batch(&mut out[range]), + } + } + } +} + /// Typed value reader for a particular primitive column. -pub struct ColumnReaderImpl { +pub type ColumnReaderImpl = GenericColumnReader< + private::ColumnLevelDecoderImpl, + private::ColumnLevelDecoderImpl, + private::ColumnValueDecoderImpl, +>; + +#[doc(hidden)] +pub struct GenericColumnReader { descr: ColumnDescPtr, - def_level_decoder: Option, - rep_level_decoder: Option, + page_reader: Box, - current_encoding: Option, - // The total number of values stored in the data page. + /// The total number of values stored in the data page. num_buffered_values: u32, - // The number of values from the current data page that has been decoded into memory - // so far. + /// The number of values from the current data page that has been decoded into memory + /// so far. num_decoded_values: u32, - // Cache of decoders for existing encodings - decoders: HashMap>>, + /// The decoder for the definition levels if any + def_level_decoder: Option, + + /// The decoder for the repetition levels if any + rep_level_decoder: Option, + + /// The decoder for the values + values_decoder: V, } -impl ColumnReaderImpl { +impl GenericColumnReader +where + R: ColumnLevelDecoder, + D: ColumnLevelDecoder, + V: ColumnValueDecoder, +{ /// Creates new column reader based on column descriptor and page reader. pub fn new(descr: ColumnDescPtr, page_reader: Box) -> Self { + let values_decoder = V::create(&descr, false); + Self::new_with_decoder(descr, page_reader, values_decoder) + } + + pub(crate) fn new_null_padding( + descr: ColumnDescPtr, + page_reader: Box, + ) -> Self { + let values_decoder = V::create(&descr, true); + Self::new_with_decoder(descr, page_reader, values_decoder) + } + + fn new_with_decoder( + descr: ColumnDescPtr, + page_reader: Box, + values_decoder: V, + ) -> Self { Self { descr, def_level_decoder: None, rep_level_decoder: None, page_reader, - current_encoding: None, num_buffered_values: 0, num_decoded_values: 0, - decoders: HashMap::new(), + values_decoder, } } @@ -159,20 +444,20 @@ impl ColumnReaderImpl { pub fn read_batch( &mut self, batch_size: usize, - mut def_levels: Option<&mut [i16]>, - mut rep_levels: Option<&mut [i16]>, - values: &mut [T::T], + mut def_levels: Option<&mut D::Writer>, + mut rep_levels: Option<&mut R::Writer>, + values: &mut V::Writer, ) -> Result<(usize, usize)> { let mut values_read = 0; let mut levels_read = 0; // Compute the smallest batch size we can read based on provided slices - let mut batch_size = min(batch_size, values.len()); + let mut batch_size = min(batch_size, values.capacity()); if let Some(ref levels) = def_levels { - batch_size = min(batch_size, levels.len()); + batch_size = min(batch_size, levels.capacity()); } if let Some(ref levels) = rep_levels { - batch_size = min(batch_size, levels.len()); + batch_size = min(batch_size, levels.capacity()); } // Read exhaustively all pages until we read all batch_size values/levels @@ -200,57 +485,57 @@ impl ColumnReaderImpl { adjusted_size }; - let mut values_to_read = 0; - let mut num_def_levels = 0; - let mut num_rep_levels = 0; - // If the field is required and non-repeated, there are no definition levels - if self.descr.max_def_level() > 0 && def_levels.as_ref().is_some() { - if let Some(ref mut levels) = def_levels { - num_def_levels = self.read_def_levels( - &mut levels[levels_read..levels_read + iter_batch_size], - )?; - for i in levels_read..levels_read + num_def_levels { - if levels[i] == self.descr.max_def_level() { - values_to_read += 1; - } - } - } - } else { - // If max definition level == 0, then it is REQUIRED field, read all - // values. If definition levels are not provided, we still - // read all values. - values_to_read = iter_batch_size; - } + let num_def_levels = match def_levels.as_mut() { + Some(levels) if self.descr.max_def_level() > 0 => self + .def_level_decoder + .as_mut() + .expect("def_level_decoder be set") + .read(*levels, levels_read..levels_read + iter_batch_size)?, + _ => 0, + }; - if self.descr.max_rep_level() > 0 && rep_levels.is_some() { - if let Some(ref mut levels) = rep_levels { - num_rep_levels = self.read_rep_levels( - &mut levels[levels_read..levels_read + iter_batch_size], - )?; - - // If definition levels are defined, check that rep levels == def - // levels - if def_levels.is_some() { - assert_eq!( - num_def_levels, num_rep_levels, - "Number of decoded rep / def levels did not match" - ); - } - } - } + let num_rep_levels = match rep_levels.as_mut() { + Some(levels) if self.descr.max_rep_level() > 0 => self + .rep_level_decoder + .as_mut() + .expect("rep_level_decoder be set") + .read(levels, levels_read..levels_read + iter_batch_size)?, + _ => 0, + }; // At this point we have read values, definition and repetition levels. // If both definition and repetition levels are defined, their counts // should be equal. Values count is always less or equal to definition levels. - // + if num_def_levels != 0 && num_rep_levels != 0 { + assert_eq!( + num_def_levels, num_rep_levels, + "Number of decoded rep / def levels did not match" + ); + } + // Note that if field is not required, but no definition levels are provided, // we would read values of batch size and (if provided, of course) repetition // levels of batch size - [!] they will not be synced, because only definition // levels enforce number of non-null values to read. - let curr_values_read = - self.read_values(&mut values[values_read..values_read + values_to_read])?; + let curr_values_read = match def_levels.as_ref() { + Some(def_levels) => { + let max_def_level = self.descr.max_def_level(); + self.values_decoder.read( + values, + levels_read..levels_read + iter_batch_size, + values_read, + |x| def_levels.get(x) == max_def_level, + )? + } + None => self.values_decoder.read( + values, + levels_read..levels_read + iter_batch_size, + values_read, + |_| true, + )?, + }; // Update all "return" counters and internal state. @@ -275,8 +560,14 @@ impl ColumnReaderImpl { Some(current_page) => { match current_page { // 1. Dictionary page: configure dictionary for this page. - p @ Page::DictionaryPage { .. } => { - self.configure_dictionary(p)?; + Page::DictionaryPage { + buf, + num_values, + encoding, + is_sorted, + } => { + self.values_decoder + .set_dict(buf, num_values, encoding, is_sorted)?; continue; } // 2. Data page v1 @@ -291,40 +582,50 @@ impl ColumnReaderImpl { self.num_buffered_values = num_values; self.num_decoded_values = 0; - let mut buffer_ptr = buf; + let max_rep_level = self.descr.max_rep_level(); + let max_def_level = self.descr.max_def_level(); - if self.descr.max_rep_level() > 0 { - let mut rep_decoder = LevelDecoder::v1( + let mut offset = 0; + + if max_rep_level > 0 { + let level_data = parse_v1_level( + max_rep_level, + num_values, rep_level_encoding, - self.descr.max_rep_level(), - ); - let total_bytes = rep_decoder.set_data( - self.num_buffered_values as usize, - buffer_ptr.all(), + buf.start_from(offset), + )?; + offset = level_data.end(); + + let decoder = R::create( + max_rep_level, + rep_level_encoding, + level_data, ); - buffer_ptr = buffer_ptr.start_from(total_bytes); - self.rep_level_decoder = Some(rep_decoder); + + self.rep_level_decoder = Some(decoder); } - if self.descr.max_def_level() > 0 { - let mut def_decoder = LevelDecoder::v1( + if max_def_level > 0 { + let level_data = parse_v1_level( + max_def_level, + num_values, def_level_encoding, - self.descr.max_def_level(), - ); - let total_bytes = def_decoder.set_data( - self.num_buffered_values as usize, - buffer_ptr.all(), + buf.start_from(offset), + )?; + offset = level_data.end(); + + let decoder = D::create( + max_def_level, + def_level_encoding, + level_data, ); - buffer_ptr = buffer_ptr.start_from(total_bytes); - self.def_level_decoder = Some(def_decoder); + + self.def_level_decoder = Some(decoder); } - // Data page v1 does not have offset, all content of buffer - // should be passed - self.set_current_page_encoding( + self.values_decoder.set_data( encoding, - &buffer_ptr, - 0, + buf.start_from(offset), num_values as usize, )?; return Ok(true); @@ -344,42 +645,36 @@ impl ColumnReaderImpl { self.num_buffered_values = num_values; self.num_decoded_values = 0; - let mut offset = 0; - // DataPage v2 only supports RLE encoding for repetition // levels if self.descr.max_rep_level() > 0 { - let mut rep_decoder = - LevelDecoder::v2(self.descr.max_rep_level()); - let bytes_read = rep_decoder.set_data_range( - self.num_buffered_values as usize, - &buf, - offset, - rep_levels_byte_len as usize, + let decoder = R::create( + self.descr.max_rep_level(), + Encoding::RLE, + buf.range(0, rep_levels_byte_len as usize), ); - offset += bytes_read; - self.rep_level_decoder = Some(rep_decoder); + self.rep_level_decoder = Some(decoder); } // DataPage v2 only supports RLE encoding for definition // levels if self.descr.max_def_level() > 0 { - let mut def_decoder = - LevelDecoder::v2(self.descr.max_def_level()); - let bytes_read = def_decoder.set_data_range( - self.num_buffered_values as usize, - &buf, - offset, - def_levels_byte_len as usize, + let decoder = D::create( + self.descr.max_def_level(), + Encoding::RLE, + buf.range( + rep_levels_byte_len as usize, + def_levels_byte_len as usize, + ), ); - offset += bytes_read; - self.def_level_decoder = Some(def_decoder); + self.def_level_decoder = Some(decoder); } - self.set_current_page_encoding( + self.values_decoder.set_data( encoding, - &buf, - offset, + buf.start_from( + (rep_levels_byte_len + def_levels_byte_len) as usize, + ), num_values as usize, )?; return Ok(true); @@ -392,38 +687,6 @@ impl ColumnReaderImpl { Ok(true) } - /// Resolves and updates encoding and set decoder for the current page - fn set_current_page_encoding( - &mut self, - mut encoding: Encoding, - buffer_ptr: &ByteBufferPtr, - offset: usize, - len: usize, - ) -> Result<()> { - if encoding == Encoding::PLAIN_DICTIONARY { - encoding = Encoding::RLE_DICTIONARY; - } - - let decoder = if encoding == Encoding::RLE_DICTIONARY { - self.decoders - .get_mut(&encoding) - .expect("Decoder for dict should have been set") - } else { - // Search cache for data page decoder - #[allow(clippy::map_entry)] - if !self.decoders.contains_key(&encoding) { - // Initialize decoder for this page - let data_decoder = get_decoder::(self.descr.clone(), encoding)?; - self.decoders.insert(encoding, data_decoder); - } - self.decoders.get_mut(&encoding).unwrap() - }; - - decoder.set_data(buffer_ptr.start_from(offset), len as usize)?; - self.current_encoding = Some(encoding); - Ok(()) - } - #[inline] fn has_next(&mut self) -> Result { if self.num_buffered_values == 0 @@ -440,63 +703,29 @@ impl ColumnReaderImpl { Ok(true) } } +} - #[inline] - fn read_rep_levels(&mut self, buffer: &mut [i16]) -> Result { - let level_decoder = self - .rep_level_decoder - .as_mut() - .expect("rep_level_decoder be set"); - level_decoder.get(buffer) - } - - #[inline] - fn read_def_levels(&mut self, buffer: &mut [i16]) -> Result { - let level_decoder = self - .def_level_decoder - .as_mut() - .expect("def_level_decoder be set"); - level_decoder.get(buffer) - } - - #[inline] - fn read_values(&mut self, buffer: &mut [T::T]) -> Result { - let encoding = self - .current_encoding - .expect("current_encoding should be set"); - let current_decoder = self - .decoders - .get_mut(&encoding) - .unwrap_or_else(|| panic!("decoder for encoding {} should be set", encoding)); - current_decoder.get(buffer) - } - - #[inline] - fn configure_dictionary(&mut self, page: Page) -> Result { - let mut encoding = page.encoding(); - if encoding == Encoding::PLAIN || encoding == Encoding::PLAIN_DICTIONARY { - encoding = Encoding::RLE_DICTIONARY - } - - if self.decoders.contains_key(&encoding) { - return Err(general_err!("Column cannot have more than one dictionary")); +fn parse_v1_level( + max_level: i16, + num_buffered_values: u32, + encoding: Encoding, + buf: ByteBufferPtr, +) -> Result { + match encoding { + Encoding::RLE => { + let i32_size = std::mem::size_of::(); + let data_size = read_num_bytes!(i32, i32_size, buf.as_ref()) as usize; + Ok(buf.range(i32_size, data_size)) } - - if encoding == Encoding::RLE_DICTIONARY { - let mut dictionary = PlainDecoder::::new(self.descr.type_length()); - let num_values = page.num_values(); - dictionary.set_data(page.buffer().clone(), num_values as usize)?; - - let mut decoder = DictDecoder::new(); - decoder.set_dict(Box::new(dictionary))?; - self.decoders.insert(encoding, Box::new(decoder)); - Ok(true) - } else { - Err(nyi_err!( - "Invalid/Unsupported encoding type for dictionary: {}", - encoding - )) + Encoding::BIT_PACKED => { + let bit_width = crate::util::bit_util::log2(max_level as u64 + 1) as u8; + let num_bytes = ceil( + (num_buffered_values as usize * bit_width as usize) as i64, + 8, + ); + Ok(buf.range(0, num_bytes as usize)) } + _ => Err(general_err!("invalid level encoding: {}", encoding)), } } diff --git a/parquet/src/util/memory.rs b/parquet/src/util/memory.rs index a9d0ba6a3d85..923c45db14d0 100644 --- a/parquet/src/util/memory.rs +++ b/parquet/src/util/memory.rs @@ -328,6 +328,12 @@ impl BufferPtr { self.start } + /// Returns the end position of this buffer + #[inline] + pub fn end(&self) -> usize { + self.start + self.len + } + /// Returns length of this buffer #[inline] pub fn len(&self) -> usize {