Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: teach FSSTArray to compress the offsets of its codes #928

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 6 additions & 20 deletions bench-vortex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,9 @@ use simplelog::{ColorChoice, Config, TermLogger, TerminalMode};
use vortex::array::ChunkedArray;
use vortex::arrow::FromArrowType;
use vortex::compress::CompressionStrategy;
use vortex::encoding::EncodingRef;
use vortex::{Array, Context, IntoArray};
use vortex_alp::ALPEncoding;
use vortex_datetime_parts::DateTimePartsEncoding;
use vortex_dict::DictEncoding;
use vortex_dtype::DType;
use vortex_fastlanes::{BitPackedEncoding, FoREncoding};
use vortex_roaring::RoaringBoolEncoding;
use vortex_runend::RunEndEncoding;
use vortex_fastlanes::DeltaEncoding;
use vortex_sampling_compressor::compressors::alp::ALPCompressor;
use vortex_sampling_compressor::compressors::bitpacked::BitPackedCompressor;
use vortex_sampling_compressor::compressors::date_time_parts::DateTimePartsCompressor;
Expand All @@ -50,19 +44,11 @@ pub mod tpch;
pub mod vortex_utils;

lazy_static! {
pub static ref CTX: Arc<Context> = Arc::new(Context::default().with_encodings([
&ALPEncoding as EncodingRef,
&DictEncoding,
&BitPackedEncoding,
&FoREncoding,
&DateTimePartsEncoding,
// &DeltaEncoding, Blows up the search space too much.
&RunEndEncoding,
&RoaringBoolEncoding,
// &RoaringIntEncoding,
// Doesn't offer anything more than FoR really
// &ZigZagEncoding,
]));
pub static ref CTX: Arc<Context> = Arc::new(
Context::default()
.with_encodings(SamplingCompressor::default().used_encodings())
.with_encoding(&DeltaEncoding)
);
}

lazy_static! {
Expand Down
13 changes: 3 additions & 10 deletions bench-vortex/src/tpch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,16 @@ use datafusion::prelude::{CsvReadOptions, ParquetReadOptions, SessionContext};
use tokio::fs::OpenOptions;
use vortex::array::{ChunkedArray, StructArray};
use vortex::arrow::FromArrowArray;
use vortex::compress::CompressionStrategy;
use vortex::variants::StructArrayTrait;
use vortex::{Array, ArrayDType, Context, IntoArray, IntoArrayVariant};
use vortex::{Array, ArrayDType, IntoArray, IntoArrayVariant};
use vortex_datafusion::memory::VortexMemTableOptions;
use vortex_datafusion::persistent::config::{VortexFile, VortexTableOptions};
use vortex_datafusion::SessionContextExt;
use vortex_dtype::DType;
use vortex_sampling_compressor::SamplingCompressor;
use vortex_serde::layouts::LayoutWriter;

use crate::idempotent_async;
use crate::{idempotent_async, CTX};

pub mod dbgen;
mod execute;
Expand Down Expand Up @@ -312,12 +311,6 @@ async fn register_vortex_file(
})
.await?;

let ctx = if enable_compression {
Arc::new(Context::default().with_encodings(SamplingCompressor::default().used_encodings()))
} else {
Arc::new(Context::default())
};

let f = OpenOptions::new()
.read(true)
.write(true)
Expand All @@ -336,7 +329,7 @@ async fn register_vortex_file(
vtx_file.to_str().unwrap().to_string(),
file_size,
)],
ctx,
CTX.clone(),
),
)?;

Expand Down
4 changes: 3 additions & 1 deletion encodings/fsst/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,9 @@ impl FSSTArray {
impl AcceptArrayVisitor for FSSTArray {
fn accept(&self, visitor: &mut dyn vortex::visitor::ArrayVisitor) -> VortexResult<()> {
visitor.visit_child("symbols", &self.symbols())?;
visitor.visit_child("codes", &self.codes())
visitor.visit_child("symbol_lengths", &self.symbol_lengths())?;
visitor.visit_child("codes", &self.codes())?;
visitor.visit_child("uncompressed_lengths", &self.uncompressed_lengths())
}
}

Expand Down
27 changes: 24 additions & 3 deletions vortex-sampling-compressor/src/compressors/fsst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ use std::fmt::Debug;
use std::sync::Arc;

use fsst::Compressor;
use vortex::array::{VarBin, VarBinView};
use vortex::array::{VarBin, VarBinArray, VarBinView};
use vortex::encoding::EncodingRef;
use vortex::{ArrayDType, ArrayDef, IntoArray};
use vortex_dtype::DType;
use vortex_error::{vortex_bail, VortexResult};
use vortex_fsst::{fsst_compress, fsst_train_compressor, FSSTArray, FSSTEncoding, FSST};

use super::delta::DeltaCompressor;
use super::{CompressedArray, CompressionTree, EncoderMetadata, EncodingCompressor};
use crate::SamplingCompressor;

Expand Down Expand Up @@ -91,18 +92,38 @@ impl EncodingCompressor for FSSTCompressor {
like.as_ref().and_then(|l| l.child(0)),
)?;

let codes_varbin = VarBinArray::try_from(fsst_array.codes())?;
let codes_varbin_dtype = codes_varbin.dtype().clone();

let codes_offsets_compressed = ctx
.named("codes_varbin_offsets")
.excluding(self)
.including(&DeltaCompressor)
.compress(
&codes_varbin.offsets(),
like.as_ref().and_then(|l| l.child(1)),
)?;

let codes = VarBinArray::try_new(
codes_offsets_compressed.array,
codes_varbin.bytes(),
codes_varbin_dtype,
codes_varbin.validity(),
)?
.into_array();

Ok(CompressedArray::new(
FSSTArray::try_new(
fsst_array.dtype().clone(),
fsst_array.symbols(),
fsst_array.symbol_lengths(),
fsst_array.codes(),
codes,
uncompressed_lengths.array,
)?
.into_array(),
Some(CompressionTree::new_with_metadata(
self,
vec![uncompressed_lengths.path],
vec![uncompressed_lengths.path, codes_offsets_compressed.path],
compressor,
)),
))
Expand Down
6 changes: 6 additions & 0 deletions vortex-sampling-compressor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,12 @@ impl<'a> SamplingCompressor<'a> {
cloned
}

pub fn including(&self, compressor: CompressorRef<'a>) -> Self {
let mut cloned = self.clone();
cloned.compressors.insert(compressor);
cloned
}

#[allow(clippy::same_name_method)]
pub fn compress(
&self,
Expand Down
Loading