Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(parquet): use optimized bloom filter #4441

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ seq-macro = { version = "0.3", default-features = false }
futures = { version = "0.3", default-features = false, features = ["std"], optional = true }
tokio = { version = "1.0", optional = true, default-features = false, features = ["macros", "rt", "io-util"] }
hashbrown = { version = "0.14", default-features = false }
twox-hash = { version = "1.6", default-features = false }
xxhash-rust = { version = "0.8", features = ["xxh64"] }
paste = { version = "1.0" }
sbbf-rs-safe = "0.2"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than using a new crate (that would add a new dependency and need to be maintained, etc) what do you think about inlining your implementation into this repository?

Copy link
Author

@ozgrakkurt ozgrakkurt Jun 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to keep it separate since it can be used in a lot of other applications, there are a lot of bloom filter libraries for rust but none of them implement split block variants afaik


[dev-dependencies]
base64 = { version = "0.21", default-features = false, features = ["std"] }
Expand Down
178 changes: 19 additions & 159 deletions parquet/src/bloom_filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,111 +27,19 @@ use crate::format::{
SplitBlockAlgorithm, Uncompressed, XxHash,
};
use bytes::{Buf, Bytes};
use std::hash::Hasher;
use sbbf_rs_safe::Filter;
use std::io::Write;
use std::sync::Arc;
use thrift::protocol::{
TCompactInputProtocol, TCompactOutputProtocol, TOutputProtocol, TSerializable,
};
use twox_hash::XxHash64;

/// Salt as defined in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md#technical-approach).
const SALT: [u32; 8] = [
0x47b6137b_u32,
0x44974d91_u32,
0x8824ad5b_u32,
0xa2b7289d_u32,
0x705495c7_u32,
0x2df1424b_u32,
0x9efc4947_u32,
0x5c6bfb31_u32,
];

/// Each block is 256 bits, broken up into eight contiguous "words", each consisting of 32 bits.
/// Each word is thought of as an array of bits; each bit is either "set" or "not set".
#[derive(Debug, Copy, Clone)]
struct Block([u32; 8]);
impl Block {
const ZERO: Block = Block([0; 8]);

/// takes as its argument a single unsigned 32-bit integer and returns a block in which each
/// word has exactly one bit set.
fn mask(x: u32) -> Self {
let mut result = [0_u32; 8];
for i in 0..8 {
// wrapping instead of checking for overflow
let y = x.wrapping_mul(SALT[i]);
let y = y >> 27;
result[i] = 1 << y;
}
Self(result)
}

#[inline]
#[cfg(target_endian = "little")]
fn to_le_bytes(self) -> [u8; 32] {
self.to_ne_bytes()
}

#[inline]
#[cfg(not(target_endian = "little"))]
fn to_le_bytes(self) -> [u8; 32] {
self.swap_bytes().to_ne_bytes()
}

#[inline]
fn to_ne_bytes(self) -> [u8; 32] {
unsafe { std::mem::transmute(self) }
}

#[inline]
#[cfg(not(target_endian = "little"))]
fn swap_bytes(mut self) -> Self {
self.0.iter_mut().for_each(|x| *x = x.swap_bytes());
self
}

/// setting every bit in the block that was also set in the result from mask
fn insert(&mut self, hash: u32) {
let mask = Self::mask(hash);
for i in 0..8 {
self[i] |= mask[i];
}
}

/// returns true when every bit that is set in the result of mask is also set in the block.
fn check(&self, hash: u32) -> bool {
let mask = Self::mask(hash);
for i in 0..8 {
if self[i] & mask[i] == 0 {
return false;
}
}
true
}
}

impl std::ops::Index<usize> for Block {
type Output = u32;

#[inline]
fn index(&self, index: usize) -> &Self::Output {
self.0.index(index)
}
}

impl std::ops::IndexMut<usize> for Block {
#[inline]
fn index_mut(&mut self, index: usize) -> &mut Self::Output {
self.0.index_mut(index)
}
}
use xxhash_rust::xxh64::xxh64;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like (OSX) this is also part of the speed up.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, forgot about that

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also in my insert bench I also call contains on parquet impl. But there is big diff in integration tests of writing parquet with bloom filters

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also realised I don't use the sbbf filter properly in this PR branch as well, It allocates a new buffer when Sbbf::new is called, this shouldn't be the case ideally. It can just take borrow of thebitset, assuming it is aligned to 64 bits


/// A split block Bloom filter. The creation of this structure is based on the
/// [`crate::file::properties::BloomFilterProperties`] struct set via [`crate::file::properties::WriterProperties`] and
/// is thus hidden by default.
#[derive(Debug, Clone)]
pub struct Sbbf(Vec<Block>);
pub struct Sbbf(Filter);

const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;

Expand Down Expand Up @@ -206,17 +114,7 @@ impl Sbbf {
}

fn new(bitset: &[u8]) -> Self {
let data = bitset
.chunks_exact(4 * 8)
.map(|chunk| {
let mut block = Block::ZERO;
for (i, word) in chunk.chunks_exact(4).enumerate() {
block[i] = u32::from_le_bytes(word.try_into().unwrap());
}
block
})
.collect::<Vec<Block>>();
Self(data)
Self(Filter::from_bytes(bitset).unwrap())
}

/// Write the bloom filter data (header and then bitset) to the output. This doesn't
Expand All @@ -235,23 +133,21 @@ impl Sbbf {

/// Write the bitset in serialized form to the writer.
fn write_bitset<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
for block in &self.0 {
writer
.write_all(block.to_le_bytes().as_slice())
.map_err(|e| {
ParquetError::General(format!(
"Could not write bloom filter bit set: {e}"
))
})?;
}
writer
.write_all(self.0.as_bytes())
.map_err(|e| {
ParquetError::General(format!(
"Could not write bloom filter bit set: {e}"
))
})?;
Ok(())
}

/// Create and populate [`BloomFilterHeader`] from this bitset for writing to serialized form
fn header(&self) -> BloomFilterHeader {
BloomFilterHeader {
// 8 i32 per block, 4 bytes per i32
num_bytes: self.0.len() as i32 * 4 * 8,
num_bytes: i32::try_from(self.0.as_bytes().len()).unwrap(),
algorithm: BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {}),
hash: BloomFilterHash::XXHASH(XxHash {}),
compression: BloomFilterCompression::UNCOMPRESSED(Uncompressed {}),
Expand Down Expand Up @@ -297,22 +193,15 @@ impl Sbbf {
Ok(Some(Self::new(&bitset)))
}

#[inline]
fn hash_to_block_index(&self, hash: u64) -> usize {
// unchecked_mul is unstable, but in reality this is safe, we'd just use saturating mul
// but it will not saturate
(((hash >> 32).saturating_mul(self.0.len() as u64)) >> 32) as usize
}

/// Insert an [AsBytes] value into the filter
pub fn insert<T: AsBytes + ?Sized>(&mut self, value: &T) {
self.insert_hash(hash_as_bytes(value));
}

/// Insert a hash into the filter
fn insert_hash(&mut self, hash: u64) {
let block_index = self.hash_to_block_index(hash);
self.0[block_index].insert(hash as u32)
/// Insert a hash into the filter.
/// Returns true if the filter might have already contained the hash.
fn insert_hash(&mut self, hash: u64) -> bool {
self.0.insert_hash(hash)
}

/// Check if an [AsBytes] value is probably present or definitely absent in the filter
Expand All @@ -324,19 +213,16 @@ impl Sbbf {
/// true for values that was never inserted ("false positive")
/// but will always return false if a hash has not been inserted.
fn check_hash(&self, hash: u64) -> bool {
let block_index = self.hash_to_block_index(hash);
self.0[block_index].check(hash as u32)
self.0.contains_hash(hash)
}
}

// per spec we use xxHash with seed=0
const SEED: u64 = 0;

#[inline]
#[inline(always)]
fn hash_as_bytes<A: AsBytes + ?Sized>(value: &A) -> u64 {
let mut hasher = XxHash64::with_seed(SEED);
hasher.write(value.as_bytes());
hasher.finish()
xxh64(value.as_bytes(), SEED)
}

#[cfg(test)]
Expand All @@ -352,32 +238,6 @@ mod tests {
assert_eq!(hash_as_bytes(""), 17241709254077376921);
}

#[test]
fn test_mask_set_quick_check() {
for i in 0..1_000_000 {
let result = Block::mask(i);
assert!(result.0.iter().all(|&x| x.count_ones() == 1));
}
}

#[test]
fn test_block_insert_and_check() {
for i in 0..1_000_000 {
let mut block = Block::ZERO;
block.insert(i);
assert!(block.check(i));
}
}

#[test]
fn test_sbbf_insert_and_check() {
let mut sbbf = Sbbf(vec![Block::ZERO; 1_000]);
for i in 0..1_000_000 {
sbbf.insert(&i);
assert!(sbbf.check(&i));
}
}

#[test]
fn test_with_fixture() {
// bloom filter produced by parquet-mr/spark for a column of i64 f"a{i}" for i in 0..10
Expand Down