Skip to content

Commit

Permalink
partial nested [skip ci]
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jan 9, 2024
1 parent 71e549e commit e1eeb04
Show file tree
Hide file tree
Showing 9 changed files with 224 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use super::utils::*;
use crate::parquet::deserialize::SliceFilteredIter;
use crate::parquet::encoding::{delta_bitpacked, delta_length_byte_array, hybrid_rle, Encoding};
use crate::parquet::page::{split_buffer, DataPage};
use crate::read::deserialize::utils::{page_is_filtered, page_is_optional};
use crate::read::ParquetError;

pub(crate) type BinaryDict = BinaryArray<i64>;
Expand Down Expand Up @@ -373,3 +374,53 @@ pub(crate) fn build_binary_state<'a>(
_ => Err(utils::not_implemented(page)),
}
}

#[derive(Debug)]
pub(crate) enum BinaryNestedState<'a> {
Optional(BinaryIter<'a>),
Required(BinaryIter<'a>),
RequiredDictionary(ValuesDictionary<'a>),
OptionalDictionary(ValuesDictionary<'a>),
}

impl<'a> utils::PageState<'a> for BinaryNestedState<'a> {
fn len(&self) -> usize {
match self {
BinaryNestedState::Optional(validity) => validity.size_hint().0,
BinaryNestedState::Required(state) => state.size_hint().0,
BinaryNestedState::RequiredDictionary(required) => required.len(),
BinaryNestedState::OptionalDictionary(optional) => optional.len(),
}
}
}

pub(crate) fn build_nested_state<'a>(page: &'a DataPage, dict: Option<&'a BinaryDict>) -> PolarsResult<BinaryNestedState<'a>> {
let is_optional = page_is_optional(page);
let is_filtered = page_is_filtered(page);

match (page.encoding(), dict, is_optional, is_filtered) {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false, false) => {
ValuesDictionary::try_new(page, dict).map(BinaryNestedState::RequiredDictionary)
},
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true, false) => {
ValuesDictionary::try_new(page, dict).map(BinaryNestedState::OptionalDictionary)
},
(Encoding::Plain, _, true, false) => {
let (_, _, values) = split_buffer(page)?;

let values = BinaryIter::new(values);

Ok(BinaryNestedState::Optional(values))
},
(Encoding::Plain, _, false, false) => {
let (_, _, values) = split_buffer(page)?;

let values = BinaryIter::new(values);

Ok(BinaryNestedState::Required(values))
},
_ => Err(utils::not_implemented(page)),
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ mod basic;
pub(super) mod decoders;
mod dictionary;
mod nested;
mod utils;
pub(super) mod utils;

pub use basic::BinaryArrayIter;
pub use dictionary::{DictIter, NestedDictIter};
Expand Down
62 changes: 7 additions & 55 deletions crates/polars-parquet/src/arrow/read/deserialize/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,42 +7,20 @@ use arrow::offset::Offset;
use polars_error::PolarsResult;

use super::super::nested_utils::*;
use super::super::utils;
use super::super::utils::MaybeNext;
use super::basic::finish;
use super::decoders::*;
use super::utils::*;
use crate::arrow::read::PagesIter;
use crate::parquet::encoding::Encoding;
use crate::parquet::page::{split_buffer, DataPage, DictPage};
use crate::parquet::schema::Repetition;

#[derive(Debug)]
enum State<'a> {
Optional(BinaryIter<'a>),
Required(BinaryIter<'a>),
RequiredDictionary(ValuesDictionary<'a>),
OptionalDictionary(ValuesDictionary<'a>),
}

impl<'a> utils::PageState<'a> for State<'a> {
fn len(&self) -> usize {
match self {
State::Optional(validity) => validity.size_hint().0,
State::Required(state) => state.size_hint().0,
State::RequiredDictionary(required) => required.len(),
State::OptionalDictionary(optional) => optional.len(),
}
}
}
use crate::parquet::page::{DataPage, DictPage};

#[derive(Debug, Default)]
struct BinaryDecoder<O: Offset> {
phantom_o: std::marker::PhantomData<O>,
}

impl<'a, O: Offset> NestedDecoder<'a> for BinaryDecoder<O> {
type State = State<'a>;
type State = BinaryNestedState<'a>;
type Dictionary = BinaryDict;
type DecodedState = (Binary<O>, MutableBitmap);

Expand All @@ -51,33 +29,7 @@ impl<'a, O: Offset> NestedDecoder<'a> for BinaryDecoder<O> {
page: &'a DataPage,
dict: Option<&'a Self::Dictionary>,
) -> PolarsResult<Self::State> {
let is_optional =
page.descriptor.primitive_type.field_info.repetition == Repetition::Optional;
let is_filtered = page.selected_rows().is_some();

match (page.encoding(), dict, is_optional, is_filtered) {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false, false) => {
ValuesDictionary::try_new(page, dict).map(State::RequiredDictionary)
},
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true, false) => {
ValuesDictionary::try_new(page, dict).map(State::OptionalDictionary)
},
(Encoding::Plain, _, true, false) => {
let (_, _, values) = split_buffer(page)?;

let values = BinaryIter::new(values);

Ok(State::Optional(values))
},
(Encoding::Plain, _, false, false) => {
let (_, _, values) = split_buffer(page)?;

let values = BinaryIter::new(values);

Ok(State::Required(values))
},
_ => Err(utils::not_implemented(page)),
}
build_nested_state(page, dict)
}

fn with_capacity(&self, capacity: usize) -> Self::DecodedState {
Expand All @@ -94,16 +46,16 @@ impl<'a, O: Offset> NestedDecoder<'a> for BinaryDecoder<O> {
) -> PolarsResult<()> {
let (values, validity) = decoded;
match state {
State::Optional(page) => {
BinaryNestedState::Optional(page) => {
let value = page.next().unwrap_or_default();
values.push(value);
validity.push(true);
},
State::Required(page) => {
BinaryNestedState::Required(page) => {
let value = page.next().unwrap_or_default();
values.push(value);
},
State::RequiredDictionary(page) => {
BinaryNestedState::RequiredDictionary(page) => {
let dict_values = &page.dict;
let item = page
.values
Expand All @@ -112,7 +64,7 @@ impl<'a, O: Offset> NestedDecoder<'a> for BinaryDecoder<O> {
.unwrap_or_default();
values.push(item);
},
State::OptionalDictionary(page) => {
BinaryNestedState::OptionalDictionary(page) => {
let dict_values = &page.dict;
let item = page
.values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ impl<'a> Iterator for BinaryIter<'a> {
return None;
}
let (length, remaining) = self.values.split_at(4);
let length = u32::from_le_bytes(length.try_into().unwrap()) as usize;
let length: [u8; 4] = unsafe { length.try_into().unwrap_unchecked() };
let length = u32::from_le_bytes(length) as usize;
let (result, remaining) = remaining.split_at(length);
self.values = remaining;
Some(result)
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
mod basic;
mod nested;

pub use nested::NestedIter;
pub use basic::BinaryViewArrayIter;
141 changes: 141 additions & 0 deletions crates/polars-parquet/src/arrow/read/deserialize/binview/nested.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
use std::collections::VecDeque;
use arrow::array::{ArrayRef, MutableBinaryViewArray};
use arrow::bitmap::MutableBitmap;
use arrow::datatypes::ArrowDataType;
use polars_error::PolarsResult;
use crate::parquet::page::{DataPage, DictPage};
use crate::read::deserialize::binary::decoders::{BinaryDict, BinaryNestedState, build_nested_state, deserialize_plain};
use crate::read::deserialize::nested_utils::{NestedDecoder, next};
use crate::read::{InitNested, NestedState, PagesIter};
use crate::read::deserialize::binview::basic::finish;
use crate::read::deserialize::utils::MaybeNext;

#[derive(Debug, Default)]
struct BinViewDecoder {
}

type DecodedStateTuple = (MutableBinaryViewArray<[u8]>, MutableBitmap);

impl<'a> NestedDecoder<'a> for BinViewDecoder {
type State = BinaryNestedState<'a>;
type Dictionary = BinaryDict;
type DecodedState = DecodedStateTuple;

fn build_state(
&self,
page: &'a DataPage,
dict: Option<&'a Self::Dictionary>,
) -> PolarsResult<Self::State> {
build_nested_state(page, dict)
}

fn with_capacity(&self, capacity: usize) -> Self::DecodedState {
(
MutableBinaryViewArray::with_capacity(capacity),
MutableBitmap::with_capacity(capacity),
)
}

fn push_valid(&self, state: &mut Self::State, decoded: &mut Self::DecodedState) -> PolarsResult<()> {
let (values, validity) = decoded;
match state {
BinaryNestedState::Optional(page) => {
let value = page.next().unwrap_or_default();
values.push_value_ignore_validity(value);
validity.push(true);
},
BinaryNestedState::Required(page) => {
let value = page.next().unwrap_or_default();
values.push_value_ignore_validity(value);
},
BinaryNestedState::RequiredDictionary(page) => {
let dict_values = &page.dict;
let item = page
.values
.next()
.map(|index| dict_values.value(index.unwrap() as usize))
.unwrap_or_default();
values.push_value_ignore_validity(item);
},
BinaryNestedState::OptionalDictionary(page) => {
let dict_values = &page.dict;
let item = page
.values
.next()
.map(|index| dict_values.value(index.unwrap() as usize))
.unwrap_or_default();
values.push_value_ignore_validity(item);
validity.push(true);
},
}
Ok(())
}

fn push_null(&self, decoded: &mut Self::DecodedState) {
let (values, validity) = decoded;
values.push_null();
validity.push(false);
}

fn deserialize_dict(&self, page: &DictPage) -> Self::Dictionary {
deserialize_plain(&page.buffer, page.num_values)
}
}

pub struct NestedIter<I: PagesIter> {
iter: I,
data_type: ArrowDataType,
init: Vec<InitNested>,
items: VecDeque<(NestedState, DecodedStateTuple)>,
dict: Option<BinaryDict>,
chunk_size: Option<usize>,
remaining: usize,
}

impl<I: PagesIter> NestedIter<I> {
pub fn new(
iter: I,
init: Vec<InitNested>,
data_type: ArrowDataType,
num_rows: usize,
chunk_size: Option<usize>,
) -> Self {
Self {
iter,
data_type,
init,
items: VecDeque::new(),
dict: None,
chunk_size,
remaining: num_rows,
}
}
}

impl<I: PagesIter> Iterator for NestedIter<I> {
type Item = PolarsResult<(NestedState, ArrayRef)>;

fn next(&mut self) -> Option<Self::Item> {
loop {
let maybe_state = next(
&mut self.iter,
&mut self.items,
&mut self.dict,
&mut self.remaining,
&self.init,
self.chunk_size,
&BinViewDecoder::default(),
);
match maybe_state {
MaybeNext::Some(Ok((nested, decoded))) => {
return Some(
finish(&self.data_type, decoded.0, decoded.1).map(|array| (nested, array)),
)
},
MaybeNext::Some(Err(e)) => return Some(Err(e)),
MaybeNext::None => return None,
MaybeNext::More => continue, // Using continue in a loop instead of calling next helps prevent stack overflow.
}
}
}
}
11 changes: 11 additions & 0 deletions crates/polars-parquet/src/arrow/read/deserialize/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,17 @@ where
|x: f64| x,
))
},
BinaryView | Utf8View => {
init.push(InitNested::Primitive(field.is_nullable));
types.pop();
remove_nested(binview::NestedIter::new(
columns.pop().unwrap(),
init,
field.data_type().clone(),
num_rows,
chunk_size,
))
}
Binary | Utf8 => {
init.push(InitNested::Primitive(field.is_nullable));
types.pop();
Expand Down
4 changes: 4 additions & 0 deletions crates/polars-parquet/src/arrow/read/deserialize/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::parquet::schema::types::{
PhysicalType, PrimitiveLogicalType, PrimitiveType, TimeUnit as ParquetTimeUnit,
};
use crate::parquet::types::int96_to_i64_ns;
use crate::read::deserialize::binview;

/// Converts an iterator of arrays to a trait object returning trait objects
#[inline]
Expand Down Expand Up @@ -336,6 +337,9 @@ pub fn page_iter_to_arrays<'a, I: PagesIter + 'a>(
pages, data_type, chunk_size, num_rows,
))
},
(PhysicalType::ByteArray, BinaryView | Utf8View) => {
Box::new(binview::BinaryViewArrayIter::new(pages, data_type, chunk_size, num_rows))
}

(_, Dictionary(key_type, _, _)) => {
return match_integer_type!(key_type, |$K| {
Expand Down
3 changes: 3 additions & 0 deletions crates/polars-parquet/src/arrow/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,9 @@ fn array_to_page_nested(
let array = array.as_any().downcast_ref().unwrap();
binary::nested_array_to_page::<i64>(array, options, type_, nested)
},
BinaryView => {
binview::n
}
LargeBinary => {
let array = array.as_any().downcast_ref().unwrap();
binary::nested_array_to_page::<i64>(array, options, type_, nested)
Expand Down

0 comments on commit e1eeb04

Please sign in to comment.