Skip to content

Commit

Permalink
feat: support miniblock with binary data (#3099)
Browse files Browse the repository at this point in the history
This PR enables miniblock encoding with binary data type.

---------

Co-authored-by: Weston Pace <[email protected]>
  • Loading branch information
broccoliSpicy and westonpace authored Nov 8, 2024
1 parent 5c19fe5 commit 3f2faf2
Show file tree
Hide file tree
Showing 9 changed files with 409 additions and 34 deletions.
4 changes: 4 additions & 0 deletions protos/encodings.proto
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ message Binary {
uint64 null_adjustment = 3;
}

message BinaryMiniBlock {
}

message Fsst {
ArrayEncoding binary = 1;
bytes symbol_table = 2;
Expand Down Expand Up @@ -269,6 +272,7 @@ message ArrayEncoding {
BitpackedForNonNeg bitpacked_for_non_neg = 12;
Constant constant = 13;
Bitpack2 bitpack2 = 14;
BinaryMiniBlock binary_mini_block = 15;
}
}

Expand Down
89 changes: 86 additions & 3 deletions rust/lance-encoding/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,60 @@ impl FixedWidthDataBlock {
}
}

pub struct VariableWidthDataBlockBuilder {
offsets: Vec<u32>,
bytes: Vec<u8>,
}

impl VariableWidthDataBlockBuilder {
fn new(estimated_size_bytes: u64) -> Self {
Self {
offsets: vec![0u32],
bytes: Vec::with_capacity(estimated_size_bytes as usize),
}
}
}

impl DataBlockBuilderImpl for VariableWidthDataBlockBuilder {
fn append(&mut self, data_block: &mut DataBlock, selection: Range<u64>) {
let block = data_block.as_variable_width_mut_ref().unwrap();
assert!(block.bits_per_offset == 32);

let offsets = block.offsets.borrow_to_typed_slice::<u32>();
let offsets = offsets.as_ref();

let start_offset = offsets[selection.start as usize];
let end_offset = offsets[selection.end as usize];
let mut previous_len = self.bytes.len();

self.bytes
.extend_from_slice(&block.data[start_offset as usize..end_offset as usize]);

self.offsets.extend(
offsets[selection.start as usize..selection.end as usize]
.iter()
.zip(&offsets[selection.start as usize + 1..=selection.end as usize])
.map(|(&current, &next)| {
let this_value_len = next - current;
previous_len += this_value_len as usize;
previous_len as u32
}),
);
}

fn finish(self: Box<Self>) -> DataBlock {
let num_values = (self.offsets.len() - 1) as u64;
DataBlock::VariableWidth(VariableWidthBlock {
data: LanceBuffer::Owned(self.bytes),
offsets: LanceBuffer::reinterpret_vec(self.offsets),
bits_per_offset: 32,
num_values,
block_info: BlockInfo::new(),
used_encodings: UsedEncoding::new(),
})
}
}

pub struct FixedWidthDataBlockBuilder {
bits_per_value: u64,
bytes_per_value: u64,
Expand All @@ -311,7 +365,7 @@ impl FixedWidthDataBlockBuilder {
}

impl DataBlockBuilderImpl for FixedWidthDataBlockBuilder {
fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
fn append(&mut self, data_block: &mut DataBlock, selection: Range<u64>) {
let block = data_block.as_fixed_width_ref().unwrap();
assert_eq!(self.bits_per_value, block.bits_per_value);
let start = selection.start as usize * self.bytes_per_value as usize;
Expand Down Expand Up @@ -853,6 +907,13 @@ impl DataBlock {
inner.bits_per_value,
estimated_size_bytes,
)),
Self::VariableWidth(inner) => {
if inner.bits_per_offset == 32 {
Box::new(VariableWidthDataBlockBuilder::new(estimated_size_bytes))
} else {
todo!()
}
}
_ => todo!(),
}
}
Expand Down Expand Up @@ -880,6 +941,17 @@ macro_rules! as_type_ref {
};
}

macro_rules! as_type_ref_mut {
($fn_name:ident, $inner:tt, $inner_type:ident) => {
pub fn $fn_name(&mut self) -> Option<&mut $inner_type> {
match self {
Self::$inner(inner) => Some(inner),
_ => None,
}
}
};
}

// Cast implementations
impl DataBlock {
as_type!(as_all_null, AllNull, AllNullDataBlock);
Expand All @@ -896,6 +968,17 @@ impl DataBlock {
as_type_ref!(as_variable_width_ref, VariableWidth, VariableWidthBlock);
as_type_ref!(as_struct_ref, Struct, StructDataBlock);
as_type_ref!(as_dictionary_ref, Dictionary, DictionaryDataBlock);
as_type_ref_mut!(as_all_null_mut_ref, AllNull, AllNullDataBlock);
as_type_ref_mut!(as_nullable_mut_ref, Nullable, NullableDataBlock);
as_type_ref_mut!(as_fixed_width_mut_ref, FixedWidth, FixedWidthDataBlock);
as_type_ref_mut!(
as_fixed_size_list_mut_ref,
FixedSizeList,
FixedSizeListBlock
);
as_type_ref_mut!(as_variable_width_mut_ref, VariableWidth, VariableWidthBlock);
as_type_ref_mut!(as_struct_mut_ref, Struct, StructDataBlock);
as_type_ref_mut!(as_dictionary_mut_ref, Dictionary, DictionaryDataBlock);
}

// Methods to convert from Arrow -> DataBlock
Expand Down Expand Up @@ -1356,7 +1439,7 @@ impl From<ArrayRef> for DataBlock {
}

pub trait DataBlockBuilderImpl {
fn append(&mut self, data_block: &DataBlock, selection: Range<u64>);
fn append(&mut self, data_block: &mut DataBlock, selection: Range<u64>);
fn finish(self: Box<Self>) -> DataBlock;
}

Expand All @@ -1380,7 +1463,7 @@ impl DataBlockBuilder {
self.builder.as_mut().unwrap().as_mut()
}

pub fn append(&mut self, data_block: &DataBlock, selection: Range<u64>) {
pub fn append(&mut self, data_block: &mut DataBlock, selection: Range<u64>) {
self.get_builder(data_block).append(data_block, selection);
}

Expand Down
13 changes: 13 additions & 0 deletions rust/lance-encoding/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ use crate::encodings::logical::primitive::{
use crate::encodings::logical::r#struct::{
SimpleStructDecoder, SimpleStructScheduler, StructuralStructDecoder, StructuralStructScheduler,
};
use crate::encodings::physical::binary::BinaryMiniBlockDecompressor;
use crate::encodings::physical::bitpack_fastlanes::BitpackMiniBlockDecompressor;
use crate::encodings::physical::value::{ConstantDecompressor, ValueDecompressor};
use crate::encodings::physical::{ColumnBuffers, FileBuffers};
Expand Down Expand Up @@ -494,6 +495,9 @@ impl DecompressorStrategy for CoreDecompressorStrategy {
pb::array_encoding::ArrayEncoding::Bitpack2(description) => {
Ok(Box::new(BitpackMiniBlockDecompressor::new(description)))
}
pb::array_encoding::ArrayEncoding::BinaryMiniBlock(_) => {
Ok(Box::new(BinaryMiniBlockDecompressor::default()))
}
_ => todo!(),
}
}
Expand Down Expand Up @@ -741,6 +745,15 @@ impl CoreFieldDecoderStrategy {
as Box<dyn StructuralFieldScheduler>,
)
}
DataType::Binary | DataType::Utf8 => {
let column_info = column_infos.expect_next()?;
let scheduler = Box::new(StructuralPrimitiveFieldScheduler::try_new(
column_info.as_ref(),
self.decompressor_strategy.as_ref(),
)?);
column_infos.next_top_level();
Ok(scheduler)
}
_ => todo!(),
}
}
Expand Down
43 changes: 30 additions & 13 deletions rust/lance-encoding/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{collections::HashMap, env, sync::Arc};

use arrow::array::AsArray;
use arrow::datatypes::UInt64Type;
use arrow_array::PrimitiveArray;
use arrow_array::{Array, ArrayRef, RecordBatch, UInt8Array};
use arrow_schema::DataType;
use bytes::{Bytes, BytesMut};
Expand All @@ -24,6 +25,7 @@ use crate::encodings::logical::blob::BlobFieldEncoder;
use crate::encodings::logical::primitive::PrimitiveStructuralEncoder;
use crate::encodings::logical::r#struct::StructFieldEncoder;
use crate::encodings::logical::r#struct::StructStructuralEncoder;
use crate::encodings::physical::binary::BinaryMiniBlockEncoder;
use crate::encodings::physical::bitpack_fastlanes::BitpackedForNonNegArrayEncoder;
use crate::encodings::physical::bitpack_fastlanes::{
compute_compressed_bit_width_for_non_neg, BitpackMiniBlockEncoder,
Expand Down Expand Up @@ -158,6 +160,7 @@ pub struct MiniBlockCompressed {
/// 8KiB of compressed data. This means that even in the extreme case
/// where we have 4 bytes of rep/def then we will have at most 24KiB of
/// data (values, repetition, and definition) per mini-block.
#[derive(Debug)]
pub struct MiniBlockChunk {
// The number of bytes that make up the chunk
//
Expand Down Expand Up @@ -777,25 +780,25 @@ impl ArrayEncodingStrategy for CoreArrayEncodingStrategy {
}
}

const MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE: u64 = 256;

impl CompressionStrategy for CoreArrayEncodingStrategy {
fn create_miniblock_compressor(
&self,
field: &Field,
_field: &Field,
data: &DataBlock,
) -> Result<Box<dyn MiniBlockCompressor>> {
assert!(field.data_type().byte_width() > 0);
let bit_widths = data
.get_stat(Stat::BitWidth)
.expect("FixedWidthDataBlock should have valid bit width statistics");
// Temporary hack to work around https://github.com/lancedb/lance/issues/3102
// Ideally we should still be able to bit-pack here (either to 0 or 1 bit per value)
let has_all_zeros = bit_widths
.as_primitive::<UInt64Type>()
.values()
.iter()
.any(|v| *v == 0);

if let DataBlock::FixedWidth(ref fixed_width_data) = data {
let bit_widths = data
.get_stat(Stat::BitWidth)
.expect("FixedWidthDataBlock should have valid bit width statistics");
// Temporary hack to work around https://github.com/lancedb/lance/issues/3102
// Ideally we should still be able to bit-pack here (either to 0 or 1 bit per value)
let has_all_zeros = bit_widths
.as_primitive::<UInt64Type>()
.values()
.iter()
.any(|v| *v == 0);
if !has_all_zeros
&& (fixed_width_data.bits_per_value == 8
|| fixed_width_data.bits_per_value == 16
Expand All @@ -805,6 +808,20 @@ impl CompressionStrategy for CoreArrayEncodingStrategy {
return Ok(Box::new(BitpackMiniBlockEncoder::default()));
}
}
if let DataBlock::VariableWidth(ref variable_width_data) = data {
if variable_width_data.bits_per_offset == 32 {
let max_len = data
.get_stat(Stat::MaxLength)
.expect("VariableWidthDataBlock should have valid max length statistics");
let max_len = max_len
.as_any()
.downcast_ref::<PrimitiveArray<UInt64Type>>()
.unwrap();
if max_len.value(0) < MINIBLOCK_MAX_BYTE_LENGTH_PER_VALUE {
return Ok(Box::new(BinaryMiniBlockEncoder::default()));
}
}
}
Ok(Box::new(ValueEncoder::default()))
}

Expand Down
4 changes: 2 additions & 2 deletions rust/lance-encoding/src/encodings/logical/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ impl DecodePageTask for DecodeMiniBlockTask {
let p2 = pad_bytes::<MINIBLOCK_ALIGNMENT>(6 + bytes_rep + p1 + bytes_def);
let values = buf.slice_with_length(6 + bytes_rep + bytes_def + p2, bytes_val);

let values = self
let mut values = self
.value_decompressor
.decompress(LanceBuffer::Borrowed(values), chunk.vals_in_chunk)?;

Expand Down Expand Up @@ -446,7 +446,7 @@ impl DecodePageTask for DecodeMiniBlockTask {
&def,
level_offset,
);
data_builder.append(&values, range);
data_builder.append(&mut values, range);
remaining -= to_take;
offset += to_take;
level_offset += to_take as usize;
Expand Down
1 change: 1 addition & 0 deletions rust/lance-encoding/src/encodings/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ pub fn decoder_from_array_encoding(
// 2.1 only
pb::array_encoding::ArrayEncoding::Constant(_) => unreachable!(),
pb::array_encoding::ArrayEncoding::Bitpack2(_) => unreachable!(),
pb::array_encoding::ArrayEncoding::BinaryMiniBlock(_) => unreachable!(),
}
}

Expand Down
Loading

0 comments on commit 3f2faf2

Please sign in to comment.