Skip to content

Commit

Permalink
feat: teach FSSTArray to compress the offsets of its codes
Browse files Browse the repository at this point in the history
The codes of an FSSTArray are a vector of binary-strings of one byte codes or an escape code
followed by a data. The offsets, unexpectedly, grow quite large, increasing the file size (for
example, the TPC-H l_comment column with this PR is 78% the byte size of itself on `develop`). Delta
encoding notably decreases the size but also inflates the compression time, seemingly proportional
to the space savings (TPC-H l_comment compresses in 111% of the time on `develop`).
  • Loading branch information
danking committed Sep 30, 2024
1 parent 49f05b4 commit f7636d9
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 36 deletions.
26 changes: 6 additions & 20 deletions bench-vortex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,9 @@ use simplelog::{ColorChoice, Config, TermLogger, TerminalMode};
use vortex::array::ChunkedArray;
use vortex::arrow::FromArrowType;
use vortex::compress::CompressionStrategy;
use vortex::encoding::EncodingRef;
use vortex::{Array, Context, IntoArray};
use vortex_alp::ALPEncoding;
use vortex_datetime_parts::DateTimePartsEncoding;
use vortex_dict::DictEncoding;
use vortex_dtype::DType;
use vortex_fastlanes::{BitPackedEncoding, FoREncoding};
use vortex_roaring::RoaringBoolEncoding;
use vortex_runend::RunEndEncoding;
use vortex_fastlanes::DeltaEncoding;
use vortex_sampling_compressor::compressors::alp::ALPCompressor;
use vortex_sampling_compressor::compressors::bitpacked::BitPackedCompressor;
use vortex_sampling_compressor::compressors::date_time_parts::DateTimePartsCompressor;
Expand All @@ -50,19 +44,11 @@ pub mod tpch;
pub mod vortex_utils;

lazy_static! {
pub static ref CTX: Arc<Context> = Arc::new(Context::default().with_encodings([
&ALPEncoding as EncodingRef,
&DictEncoding,
&BitPackedEncoding,
&FoREncoding,
&DateTimePartsEncoding,
// &DeltaEncoding, Blows up the search space too much.
&RunEndEncoding,
&RoaringBoolEncoding,
// &RoaringIntEncoding,
// Doesn't offer anything more than FoR really
// &ZigZagEncoding,
]));
pub static ref CTX: Arc<Context> = Arc::new(
Context::default()
.with_encodings(SamplingCompressor::default().used_encodings())
.with_encoding(&DeltaEncoding)
);
}

lazy_static! {
Expand Down
13 changes: 3 additions & 10 deletions bench-vortex/src/tpch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,16 @@ use datafusion::prelude::{CsvReadOptions, ParquetReadOptions, SessionContext};
use tokio::fs::OpenOptions;
use vortex::array::{ChunkedArray, StructArray};
use vortex::arrow::FromArrowArray;
use vortex::compress::CompressionStrategy;
use vortex::variants::StructArrayTrait;
use vortex::{Array, ArrayDType, Context, IntoArray, IntoArrayVariant};
use vortex::{Array, ArrayDType, IntoArray, IntoArrayVariant};
use vortex_datafusion::memory::VortexMemTableOptions;
use vortex_datafusion::persistent::config::{VortexFile, VortexTableOptions};
use vortex_datafusion::SessionContextExt;
use vortex_dtype::DType;
use vortex_sampling_compressor::SamplingCompressor;
use vortex_serde::layouts::LayoutWriter;

use crate::idempotent_async;
use crate::{idempotent_async, CTX};

pub mod dbgen;
mod execute;
Expand Down Expand Up @@ -306,12 +305,6 @@ async fn register_vortex_file(
})
.await?;

let ctx = if enable_compression {
Arc::new(Context::default().with_encodings(SamplingCompressor::default().used_encodings()))
} else {
Arc::new(Context::default())
};

let f = OpenOptions::new()
.read(true)
.write(true)
Expand All @@ -330,7 +323,7 @@ async fn register_vortex_file(
vtx_file.to_str().unwrap().to_string(),
file_size,
)],
ctx,
CTX.clone(),
),
)?;

Expand Down
4 changes: 3 additions & 1 deletion encodings/fsst/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,9 @@ impl FSSTArray {
impl AcceptArrayVisitor for FSSTArray {
fn accept(&self, visitor: &mut dyn vortex::visitor::ArrayVisitor) -> VortexResult<()> {
visitor.visit_child("symbols", &self.symbols())?;
visitor.visit_child("codes", &self.codes())
visitor.visit_child("symbol_lengths", &self.symbol_lengths())?;
visitor.visit_child("codes", &self.codes())?;
visitor.visit_child("uncompressed_lengths", &self.uncompressed_lengths())
}
}

Expand Down
15 changes: 13 additions & 2 deletions vortex-sampling-compressor/src/compressors/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::HashSet;

use vortex::array::PrimitiveArray;
use vortex::encoding::EncodingRef;
use vortex::stats::ArrayStatistics;
use vortex::{Array, ArrayDef, IntoArray};
use vortex_error::VortexResult;
use vortex_fastlanes::{delta_compress, Delta, DeltaArray, DeltaEncoding};
Expand All @@ -12,6 +13,10 @@ use crate::SamplingCompressor;
#[derive(Debug)]
pub struct DeltaCompressor;

fn possibly_negative(v: Option<i64>) -> bool {
v.map(|x| x < 0).unwrap_or(true)
}

impl EncodingCompressor for DeltaCompressor {
fn id(&self) -> &str {
Delta::ID.as_ref()
Expand All @@ -21,11 +26,17 @@ impl EncodingCompressor for DeltaCompressor {
// Only support primitive arrays
let parray = PrimitiveArray::try_from(array).ok()?;

// Only supports ints
if !parray.ptype().is_unsigned_int() {
if !parray.ptype().is_int() {
return None;
}

if parray.ptype().is_signed_int() {
let min = parray.statistics().compute_min::<i64>();
if possibly_negative(min) {
return None;
}
}

Some(self)
}

Expand Down
34 changes: 31 additions & 3 deletions vortex-sampling-compressor/src/compressors/fsst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ use std::fmt::Debug;
use std::sync::Arc;

use fsst::Compressor;
use vortex::array::{VarBin, VarBinView};
use vortex::array::{VarBin, VarBinArray, VarBinView};
use vortex::encoding::EncodingRef;
use vortex::{ArrayDType, ArrayDef, IntoArray};
use vortex_dtype::DType;
use vortex_error::{vortex_bail, VortexResult};
use vortex_fsst::{fsst_compress, fsst_train_compressor, FSSTArray, FSSTEncoding, FSST};

use super::delta::DeltaCompressor;
use super::{CompressedArray, CompressionTree, EncoderMetadata, EncodingCompressor};
use crate::SamplingCompressor;

Expand Down Expand Up @@ -91,18 +92,45 @@ impl EncodingCompressor for FSSTCompressor {
like.as_ref().and_then(|l| l.child(0)),
)?;

let codes_compressor = ctx.auxiliary("codes");
let codes_varbin = VarBinArray::try_from(fsst_array.codes())?;
let codes_varbin_dtype = codes_varbin.dtype().clone();

let codes_offsets_delta = DeltaCompressor.compress(
&codes_varbin.offsets(),
like.as_ref().and_then(|l| l.child(1).cloned()),
ctx.auxiliary("offsets"),
)?;

let codes_offsets_compressed = codes_compressor.auxiliary("delta_offsets").compress(
&codes_offsets_delta.array,
like.as_ref().and_then(|l| l.child(2)),
)?;

let codes = VarBinArray::try_new(
codes_offsets_compressed.array,
codes_varbin.bytes(),
codes_varbin_dtype,
codes_varbin.validity(),
)?
.into_array();

Ok(CompressedArray::new(
FSSTArray::try_new(
fsst_array.dtype().clone(),
fsst_array.symbols(),
fsst_array.symbol_lengths(),
fsst_array.codes(),
codes,
uncompressed_lengths.array,
)?
.into_array(),
Some(CompressionTree::new_with_metadata(
self,
vec![uncompressed_lengths.path],
vec![
uncompressed_lengths.path,
codes_offsets_delta.path,
codes_offsets_compressed.path,
],
compressor,
)),
))
Expand Down

0 comments on commit f7636d9

Please sign in to comment.