diff --git a/rust/worker/src/blockstore/arrow_blockfile/block/delta.rs b/rust/worker/src/blockstore/arrow_blockfile/block/delta.rs deleted file mode 100644 index c0ec1b9f55e..00000000000 --- a/rust/worker/src/blockstore/arrow_blockfile/block/delta.rs +++ /dev/null @@ -1,351 +0,0 @@ -use crate::blockstore::{ - arrow_blockfile::{blockfile::MAX_BLOCK_SIZE, provider::ArrowBlockProvider}, - types::{BlockfileKey, KeyType, Value, ValueType}, -}; -use arrow::util::bit_util; -use parking_lot::RwLock; -use std::{collections::BTreeMap, sync::Arc}; - -use super::{Block, BlockBuilderOptions, BlockData, BlockDataBuilder}; - -#[derive(Clone)] -pub struct BlockDelta { - pub source_block: Arc, - inner: Arc>, -} - -impl BlockDelta { - pub fn can_add(&self, key: &BlockfileKey, value: &Value) -> bool { - let inner = self.inner.read(); - inner.can_add(key, value) - } - - pub fn add(&self, key: BlockfileKey, value: Value) { - let mut inner = self.inner.write(); - inner.add(key, value); - } - - pub fn delete(&self, key: BlockfileKey) { - let mut inner = self.inner.write(); - inner.delete(key); - } - - pub fn get_min_key(&self) -> Option { - let inner = self.inner.read(); - let first_key = inner.new_data.keys().next(); - first_key.cloned() - } - - fn get_prefix_size(&self) -> usize { - let inner = self.inner.read(); - inner.get_prefix_size() - } - - fn get_key_size(&self) -> usize { - let inner = self.inner.read(); - inner.get_key_size() - } - - fn get_value_size(&self) -> usize { - let inner = self.inner.read(); - inner.get_value_size() - } - - fn get_value_count(&self) -> usize { - let inner = self.inner.read(); - inner.get_value_count() - } - - pub fn get_size(&self) -> usize { - let inner = self.inner.read(); - inner.get_size( - self.source_block.get_key_type(), - self.source_block.get_value_type(), - ) - } - - fn len(&self) -> usize { - let inner = self.inner.read(); - inner.new_data.len() - } - - pub fn split(&self, provider: &ArrowBlockProvider) -> (BlockfileKey, BlockDelta) { - let new_block = provider.create_block( - self.source_block.get_key_type(), - self.source_block.get_value_type(), - ); - let mut inner = self.inner.write(); - let (split_key, new_adds) = inner.split( - self.source_block.get_key_type(), - self.source_block.get_value_type(), - ); - ( - split_key, - BlockDelta { - source_block: new_block, - inner: Arc::new(RwLock::new(BlockDeltaInner { new_data: new_adds })), - }, - ) - } -} - -struct BlockDeltaInner { - new_data: BTreeMap, -} - -impl BlockDeltaInner { - fn add(&mut self, key: BlockfileKey, value: Value) { - self.new_data.insert(key, value); - } - - fn delete(&mut self, key: BlockfileKey) { - if self.new_data.contains_key(&key) { - self.new_data.remove(&key); - } - } - - fn get_block_size( - &self, - item_count: usize, - prefix_size: usize, - key_size: usize, - value_size: usize, - key_type: KeyType, - value_type: ValueType, - ) -> usize { - let prefix_total_bytes = bit_util::round_upto_multiple_of_64(prefix_size); - let prefix_offset_bytes = bit_util::round_upto_multiple_of_64((item_count + 1) * 4); - - // https://docs.rs/arrow/latest/arrow/array/array/struct.GenericListArray.html - let key_total_bytes = bit_util::round_upto_multiple_of_64(key_size); - let key_offset_bytes = match key_type { - KeyType::String => bit_util::round_upto_multiple_of_64((item_count + 1) * 4), - KeyType::Float => 0, - }; - - let value_total_bytes = bit_util::round_upto_multiple_of_64(value_size); - let value_offset_bytes = match value_type { - ValueType::Int32Array | ValueType::String => { - bit_util::round_upto_multiple_of_64((item_count + 1) * 4) - } - _ => unimplemented!("Value type not implemented"), - }; - - prefix_total_bytes - + prefix_offset_bytes - + key_total_bytes - + key_offset_bytes - + value_total_bytes - + value_offset_bytes - } - - fn get_size(&self, key_type: KeyType, value_type: ValueType) -> usize { - let prefix_data_size = self.get_prefix_size(); - let key_data_size = self.get_key_size(); - let value_data_size = self.get_value_size(); - - self.get_block_size( - self.new_data.len(), - prefix_data_size, - key_data_size, - value_data_size, - key_type, - value_type, - ) - } - - fn get_prefix_size(&self) -> usize { - self.new_data - .iter() - .fold(0, |acc, (key, _)| acc + key.get_prefix_size()) - } - - fn get_key_size(&self) -> usize { - self.new_data - .iter() - .fold(0, |acc, (key, _)| acc + key.key.get_size()) - } - - fn get_value_size(&self) -> usize { - self.new_data - .iter() - .fold(0, |acc, (_, value)| acc + value.get_size()) - } - - fn get_value_count(&self) -> usize { - self.new_data.iter().fold(0, |acc, (_, value)| match value { - Value::Int32ArrayValue(arr) => acc + arr.len(), - Value::StringValue(s) => acc + s.len(), - _ => unimplemented!("Value type not implemented"), - }) - } - - fn can_add(&self, key: &BlockfileKey, value: &Value) -> bool { - // TODO: move this into add with an error - let additional_prefix_size = key.get_prefix_size(); - let additional_key_size = key.key.get_size(); - let additional_value_size = value.get_size(); - - let prefix_data_size = self.get_prefix_size() + additional_prefix_size; - let key_data_size = self.get_key_size() + additional_key_size; - let value_data_size = self.get_value_size() + additional_value_size; - // TODO: use the same offset matching as in get_block_size - let prefix_offset_size = (self.new_data.len() + 1) * 4; - let key_offset_size = (self.new_data.len() + 1) * 4; - let value_offset_size = (self.new_data.len() + 1) * 4; - - let prefix_total_bytes = bit_util::round_upto_multiple_of_64(prefix_data_size) - + bit_util::round_upto_multiple_of_64(prefix_offset_size); - let key_total_bytes = bit_util::round_upto_multiple_of_64(key_data_size) - + bit_util::round_upto_multiple_of_64(key_offset_size); - let value_total_bytes = bit_util::round_upto_multiple_of_64(value_data_size) - + bit_util::round_upto_multiple_of_64(value_offset_size); - let total_future_size = prefix_total_bytes + key_total_bytes + value_total_bytes; - - total_future_size <= MAX_BLOCK_SIZE - } - - fn split( - &mut self, - key_type: KeyType, - value_type: ValueType, - ) -> (BlockfileKey, BTreeMap) { - let half_size = MAX_BLOCK_SIZE / 2; - let mut running_prefix_size = 0; - let mut running_key_size = 0; - let mut running_value_size = 0; - let mut running_count = 0; - let mut split_key = None; - // The split key will be the last key that pushes the block over the half size. Not the first key that pushes it over - for (key, value) in self.new_data.iter() { - running_prefix_size += key.get_prefix_size(); - running_key_size += key.key.get_size(); - running_value_size += value.get_size(); - running_count += 1; - let current_size = self.get_block_size( - running_count, - running_prefix_size, - running_key_size, - running_value_size, - key_type, - value_type, - ); - if half_size < current_size { - break; - } - split_key = Some(key.clone()); - } - - match &split_key { - None => panic!("No split point found"), - Some(split_key) => { - let split_after = self.new_data.split_off(split_key); - return (split_key.clone(), split_after); - } - } - } -} - -impl From<&BlockDelta> for BlockData { - fn from(delta: &BlockDelta) -> Self { - let mut builder = BlockDataBuilder::new( - delta.source_block.get_key_type(), - delta.source_block.get_value_type(), - Some(BlockBuilderOptions::new( - delta.len(), - delta.get_prefix_size(), - delta.get_key_size(), - delta.get_value_count(), - delta.get_value_size(), - )), - ); - for (key, value) in delta.inner.read().new_data.iter() { - builder.add(key.clone(), value.clone()); - } - builder.build() - } -} - -impl From> for BlockDelta { - fn from(source_block: Arc) -> Self { - // Read the exising block and put it into adds. We only create these - // when we have a write to this block, so we don't care about the cost of - // reading the block. Since we know we will have to do that no matter what. - let mut adds = BTreeMap::new(); - let source_block_iter = source_block.iter(); - for (key, value) in source_block_iter { - adds.insert(key, value); - } - BlockDelta { - source_block, - inner: Arc::new(RwLock::new(BlockDeltaInner { new_data: adds })), - } - } -} - -#[cfg(test)] -mod test { - use arrow::array::{Array, Int32Array}; - use rand::{random, Rng}; - - use crate::blockstore::types::{Key, KeyType, ValueType}; - - use super::*; - - #[test] - fn test_sizing_int_arr_val() { - let block_provider = ArrowBlockProvider::new(); - let block = block_provider.create_block(KeyType::String, ValueType::Int32Array); - let delta = BlockDelta::from(block.clone()); - - let n = 2000; - for i in 0..n { - let key = BlockfileKey::new("prefix".to_string(), Key::String(format!("key{}", i))); - let value_len: usize = rand::thread_rng().gen_range(1..100); - let mut new_vec = Vec::with_capacity(value_len); - for _ in 0..value_len { - new_vec.push(random::()); - } - delta.add(key, Value::Int32ArrayValue(Int32Array::from(new_vec))); - } - - let size = delta.get_size(); - let block_data = BlockData::from(&delta); - assert_eq!(size, block_data.get_size()); - } - - #[test] - fn test_sizing_string_val() { - let block_provider = ArrowBlockProvider::new(); - let block = block_provider.create_block(KeyType::String, ValueType::String); - let delta = BlockDelta::from(block.clone()); - - let n = 2000; - for i in 0..n { - let key = BlockfileKey::new("prefix".to_string(), Key::String(format!("key{}", i))); - let value = Value::StringValue(format!("value{}", i)); - delta.add(key, value); - } - let size = delta.get_size(); - let block_data = BlockData::from(&delta); - assert_eq!(size, block_data.get_size()); - } - - #[test] - fn test_sizing_int_key() { - let block_provider = ArrowBlockProvider::new(); - let block = block_provider.create_block(KeyType::Float, ValueType::String); - let delta = BlockDelta::from(block.clone()); - - let n = 2000; - for i in 0..n { - let key = BlockfileKey::new("prefix".to_string(), Key::Float(i as f32)); - let value = Value::StringValue(format!("value{}", i)); - delta.add(key, value); - } - - let size = delta.get_size(); - let block_data = BlockData::from(&delta); - assert_eq!(size, block_data.get_size()); - } -} diff --git a/rust/worker/src/blockstore/arrow_blockfile/block/iterator.rs b/rust/worker/src/blockstore/arrow_blockfile/block/iterator.rs deleted file mode 100644 index e04c362d7e4..00000000000 --- a/rust/worker/src/blockstore/arrow_blockfile/block/iterator.rs +++ /dev/null @@ -1,93 +0,0 @@ -use super::types::Block; -use crate::blockstore::types::{BlockfileKey, Key, KeyType, Value, ValueType}; -use arrow::array::{Array, Int32Array, ListArray, StringArray}; - -pub(super) struct BlockIterator { - block: Block, - index: usize, - key_type: KeyType, - value_type: ValueType, -} - -impl BlockIterator { - pub fn new(block: Block, key_type: KeyType, value_type: ValueType) -> Self { - Self { - block, - index: 0, - key_type, - value_type, - } - } -} - -impl Iterator for BlockIterator { - type Item = (BlockfileKey, Value); - - fn next(&mut self) -> Option { - let data = &self.block.inner.read().data; - if data.is_none() { - return None; - } - - // TODO: clean up unwraps - let prefix = data.as_ref().unwrap().data.column(0); - let key = data.as_ref().unwrap().data.column(1); - let value = data.as_ref().unwrap().data.column(2); - - if self.index >= prefix.len() { - return None; - } - - let prefix = match prefix.as_any().downcast_ref::() { - Some(prefix) => prefix.value(self.index).to_owned(), - None => return None, - }; - - let key = match self.key_type { - KeyType::String => match key.as_any().downcast_ref::() { - Some(key) => Key::String(key.value(self.index).to_string()), - None => return None, - }, - KeyType::Float => match key.as_any().downcast_ref::() { - Some(key) => Key::Float(key.value(self.index) as f32), - None => return None, - }, - }; - - let value = match self.value_type { - ValueType::Int32Array => match value.as_any().downcast_ref::() { - Some(value) => { - let value = match value - .value(self.index) - .as_any() - .downcast_ref::() - { - Some(value) => { - // An arrow array, if nested in a larger structure, when cloned may clone the entire larger buffer. - // This leads to a memory overhead and also breaks our sizing assumptions. In order to work around this, - // we have to manuallly create a new array and copy the data over rather than relying on clone. - - // Note that we use a vector here to avoid the overhead of the builder. The from() method for primitive - // types uses unsafe code to wrap the vecs underlying buffer in an arrow array. - - // There are more performant ways to do this, but this is the most straightforward. - - let mut new_vec = Vec::with_capacity(value.len()); - for i in 0..value.len() { - new_vec.push(value.value(i)); - } - let value = Int32Array::from(new_vec); - Value::Int32ArrayValue(value) - } - None => return None, - }; - value - } - None => return None, - }, - _ => unimplemented!(), - }; - self.index += 1; - Some((BlockfileKey::new(prefix, key), value)) - } -} diff --git a/rust/worker/src/blockstore/arrow_blockfile/block/mod.rs b/rust/worker/src/blockstore/arrow_blockfile/block/mod.rs deleted file mode 100644 index 1320c0bb6d5..00000000000 --- a/rust/worker/src/blockstore/arrow_blockfile/block/mod.rs +++ /dev/null @@ -1,6 +0,0 @@ -pub(in crate::blockstore::arrow_blockfile) mod delta; -mod iterator; -mod types; - -// Re-export types at the module level -pub(in crate::blockstore::arrow_blockfile) use types::*; diff --git a/rust/worker/src/blockstore/arrow_blockfile/block/types.rs b/rust/worker/src/blockstore/arrow_blockfile/block/types.rs deleted file mode 100644 index cfe27668e87..00000000000 --- a/rust/worker/src/blockstore/arrow_blockfile/block/types.rs +++ /dev/null @@ -1,463 +0,0 @@ -use super::delta::BlockDelta; -use super::iterator::BlockIterator; -use crate::blockstore::types::{BlockfileKey, Key, KeyType, Value, ValueType}; -use crate::errors::{ChromaError, ErrorCodes}; -use arrow::array::{Float32Array, Float32Builder}; -use arrow::{ - array::{Array, Int32Array, Int32Builder, ListArray, ListBuilder, StringArray, StringBuilder}, - datatypes::{DataType, Field}, - record_batch::RecordBatch, -}; -use parking_lot::RwLock; -use std::sync::Arc; -use thiserror::Error; -use uuid::Uuid; - -#[derive(Clone, Copy)] -pub(in crate::blockstore::arrow_blockfile) enum BlockState { - Uninitialized, - Initialized, - Commited, - Registered, -} - -pub(super) struct Inner { - pub(super) id: Uuid, - pub(super) data: Option, - pub(super) state: BlockState, - pub(super) key_type: KeyType, - pub(super) value_type: ValueType, -} - -#[derive(Clone)] -pub(in crate::blockstore::arrow_blockfile) struct Block { - pub(super) inner: Arc>, -} - -#[derive(Error, Debug)] -pub enum BlockError { - #[error("Invalid state transition")] - InvalidStateTransition, -} - -impl ChromaError for BlockError { - fn code(&self) -> ErrorCodes { - match self { - BlockError::InvalidStateTransition => ErrorCodes::Internal, - } - } -} - -impl Block { - pub fn new(id: Uuid, key_type: KeyType, value_type: ValueType) -> Self { - Self { - inner: Arc::new(RwLock::new(Inner { - id, - data: None, - state: BlockState::Uninitialized, - key_type, - value_type, - })), - } - } - - pub(in crate::blockstore::arrow_blockfile) fn get( - &self, - query_key: &BlockfileKey, - ) -> Option { - match &self.inner.read().data { - Some(data) => { - let prefix = data.data.column(0); - let key = data.data.column(1); - let value = data.data.column(2); - // TODO: binary search - // TODO: clean this up - for i in 0..prefix.len() { - if prefix - .as_any() - .downcast_ref::() - .unwrap() - .value(i) - == query_key.prefix - { - let key_matches = match &query_key.key { - Key::String(inner_key) => { - inner_key - == key.as_any().downcast_ref::().unwrap().value(i) - } - Key::Float(inner_key) => { - *inner_key - == key - .as_any() - .downcast_ref::() - .unwrap() - .value(i) - } - }; - if key_matches { - match self.get_value_type() { - ValueType::Int32Array => { - return Some(Value::Int32ArrayValue( - value - .as_any() - .downcast_ref::() - .unwrap() - .value(i) - .as_any() - .downcast_ref::() - .unwrap() - .clone(), - )) - } - ValueType::String => { - return Some(Value::StringValue( - value - .as_any() - .downcast_ref::() - .unwrap() - .value(i) - .to_string(), - )) - } - _ => unimplemented!(), - } - } - } - } - None - } - None => None, - } - } - - pub(in crate::blockstore::arrow_blockfile) fn get_size(&self) -> usize { - match &self.inner.read().data { - Some(data) => data.get_size(), - None => 0, - } - } - - pub(in crate::blockstore::arrow_blockfile) fn len(&self) -> usize { - match &self.inner.read().data { - Some(data) => data.data.column(0).len(), - None => 0, - } - } - - pub(in crate::blockstore::arrow_blockfile) fn get_id(&self) -> Uuid { - self.inner.read().id - } - - pub(in crate::blockstore::arrow_blockfile) fn get_key_type(&self) -> KeyType { - self.inner.read().key_type - } - - pub(in crate::blockstore::arrow_blockfile) fn get_value_type(&self) -> ValueType { - self.inner.read().value_type - } - - pub(in crate::blockstore::arrow_blockfile) fn get_state(&self) -> BlockState { - self.inner.read().state - } - - pub(in crate::blockstore::arrow_blockfile) fn apply_delta( - &self, - delta: &BlockDelta, - ) -> Result<(), Box> { - let data = BlockData::from(delta); - let mut inner = self.inner.write(); - match inner.state { - BlockState::Uninitialized => { - inner.data = Some(data); - inner.state = BlockState::Initialized; - Ok(()) - } - BlockState::Initialized => { - inner.data = Some(data); - inner.state = BlockState::Initialized; - Ok(()) - } - BlockState::Commited | BlockState::Registered => { - Err(Box::new(BlockError::InvalidStateTransition)) - } - } - } - - pub(in crate::blockstore::arrow_blockfile) fn commit(&self) -> Result<(), Box> { - let mut inner = self.inner.write(); - match inner.state { - BlockState::Uninitialized => Ok(()), - BlockState::Initialized => { - inner.state = BlockState::Commited; - Ok(()) - } - BlockState::Commited | BlockState::Registered => { - Err(Box::new(BlockError::InvalidStateTransition)) - } - } - } - - pub(super) fn iter(&self) -> BlockIterator { - BlockIterator::new( - self.clone(), - self.inner.read().key_type, - self.inner.read().value_type, - ) - } -} - -#[derive(Clone)] -pub(super) struct BlockData { - // Arrow record batch with the schema (prefix, key, value) - pub(super) data: RecordBatch, -} - -enum KeyBuilder { - StringBuilder(StringBuilder), - FloatBuilder(Float32Builder), -} - -enum ValueBuilder { - Int32ArrayValueBuilder(ListBuilder), - StringValueBuilder(StringBuilder), -} - -impl BlockData { - pub(crate) fn new(data: RecordBatch) -> Self { - Self { data } - } - - pub(crate) fn get_size(&self) -> usize { - let mut total_size = 0; - for column in self.data.columns() { - total_size += column.get_buffer_memory_size(); - } - total_size - } -} - -pub(super) struct BlockDataBuilder { - prefix_builder: StringBuilder, - key_builder: KeyBuilder, - value_builder: ValueBuilder, -} - -pub(super) struct BlockBuilderOptions { - pub(super) item_count: usize, - pub(super) prefix_data_capacity: usize, - pub(super) key_data_capacity: usize, - // The capacity of the value in the case of nested types is a total capacity. - // I.E. if you have a list of lists, the capacity is the total number of lists - // times the total number of items in each list. - // TODO: rethink the naming here. Data capacity vs number of items capacity - pub(super) total_value_count: usize, - pub(super) total_value_capacity: usize, -} - -impl BlockBuilderOptions { - pub(super) fn new( - item_count: usize, - prefix_data_capacity: usize, - key_data_capacity: usize, - total_value_count: usize, - total_value_capacity: usize, - ) -> Self { - Self { - item_count, - prefix_data_capacity, - key_data_capacity, - total_value_count, - total_value_capacity, - } - } - - pub(super) fn default() -> Self { - Self { - item_count: 1024, - prefix_data_capacity: 1024, - key_data_capacity: 1024, - total_value_count: 1024, - total_value_capacity: 1024, - } - } -} - -impl BlockDataBuilder { - pub(super) fn new( - key_type: KeyType, - value_type: ValueType, - options: Option, - ) -> Self { - let options = options.unwrap_or(BlockBuilderOptions::default()); - let prefix_builder = - StringBuilder::with_capacity(options.item_count, options.prefix_data_capacity); - let key_builder = match key_type { - KeyType::String => KeyBuilder::StringBuilder(StringBuilder::with_capacity( - options.item_count, - options.key_data_capacity, - )), - KeyType::Float => { - KeyBuilder::FloatBuilder(Float32Builder::with_capacity(options.item_count)) - } - }; - let value_builder = match value_type { - ValueType::Int32Array => { - ValueBuilder::Int32ArrayValueBuilder(ListBuilder::with_capacity( - Int32Builder::with_capacity(options.total_value_count), - options.item_count, - )) - } - ValueType::String => ValueBuilder::StringValueBuilder(StringBuilder::with_capacity( - options.item_count, - options.total_value_capacity, - )), - _ => unimplemented!(), - }; - Self { - prefix_builder, - key_builder, - value_builder, - } - } - - pub(super) fn add(&mut self, key: BlockfileKey, value: Value) { - // TODO: you must add in sorted order, error if not - self.prefix_builder.append_value(key.prefix); - match self.key_builder { - KeyBuilder::StringBuilder(ref mut builder) => match key.key { - Key::String(key) => { - builder.append_value(key); - } - _ => unreachable!("Invalid key type for block"), - }, - KeyBuilder::FloatBuilder(ref mut builder) => match key.key { - Key::Float(key) => { - builder.append_value(key); - } - _ => unreachable!("Invalid key type for block"), - }, - } - - match self.value_builder { - ValueBuilder::Int32ArrayValueBuilder(ref mut builder) => match value { - Value::Int32ArrayValue(array) => { - builder.append_value(&array); - } - _ => unimplemented!(), - }, - ValueBuilder::StringValueBuilder(ref mut builder) => match value { - Value::StringValue(string) => { - builder.append_value(string); - } - _ => unimplemented!(), - }, - } - } - - pub(super) fn build(&mut self) -> BlockData { - let prefix = self.prefix_builder.finish(); - let prefix_field = Field::new("prefix", DataType::Utf8, true); - // TODO: figure out how to get rid of nullable, the builders turn it on by default but we don't want it - let key_field; - let key = match self.key_builder { - KeyBuilder::StringBuilder(ref mut builder) => { - key_field = Field::new("key", DataType::Utf8, true); - let arr = builder.finish(); - (&arr as &dyn Array).slice(0, arr.len()) - } - KeyBuilder::FloatBuilder(ref mut builder) => { - key_field = Field::new("key", DataType::Float32, true); - let arr = builder.finish(); - (&arr as &dyn Array).slice(0, arr.len()) - } - }; - - let value_field; - let value = match self.value_builder { - ValueBuilder::Int32ArrayValueBuilder(ref mut builder) => { - value_field = Field::new( - "value", - DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), - true, - ); - let arr = builder.finish(); - (&arr as &dyn Array).slice(0, arr.len()) - } - ValueBuilder::StringValueBuilder(ref mut builder) => { - value_field = Field::new("value", DataType::Utf8, true); - let arr = builder.finish(); - (&arr as &dyn Array).slice(0, arr.len()) - } - }; - - let schema = Arc::new(arrow::datatypes::Schema::new(vec![ - prefix_field, - key_field, - value_field, - ])); - let record_batch = - RecordBatch::try_new(schema, vec![Arc::new(prefix), Arc::new(key), value]); - BlockData::new(record_batch.unwrap()) - } - - pub(super) fn get_size(&self) -> usize { - let size = 0; - size - } -} - -#[cfg(test)] -mod test { - use super::*; - use crate::blockstore::types::Key; - use arrow::array::Int32Array; - - #[test] - fn test_block_builder() { - let num_entries = 1000; - - let mut keys = Vec::new(); - let mut key_bytes = 0; - for i in 0..num_entries { - keys.push(Key::String(i.to_string())); - key_bytes += i.to_string().len(); - } - - let prefix = "key".to_string(); - let prefix_bytes = prefix.len() * num_entries; - let mut block_builder = BlockDataBuilder::new( - KeyType::String, - ValueType::Int32Array, - Some(BlockBuilderOptions::new( - num_entries, - prefix_bytes, - key_bytes, - num_entries * 2, // 2 int32s per entry - num_entries * 2 * 4, // 2 int32s per entry - )), - ); - - for i in 0..num_entries { - block_builder.add( - BlockfileKey::new(prefix.clone(), keys[i].clone()), - Value::Int32ArrayValue(Int32Array::from(vec![i as i32, (i + 1) as i32])), - ); - } - // let prefix_total_bytes = bit_util::round_upto_multiple_of_64(prefix_bytes) - // + bit_util::round_upto_multiple_of_64(4 * num_entries); - // let key_total_bytes = bit_util::round_upto_multiple_of_64(key_bytes) - // + bit_util::round_upto_multiple_of_64(4 * num_entries); - // let value_bytes = bit_util::round_upto_multiple_of_64(4 * num_entries) - // + bit_util::round_upto_multiple_of_64(4 * num_entries); - - // println!("Expected prefix total size: {}", prefix_total_bytes); - // println!("Expected key total size: {}", key_total_bytes); - // let block_data = block_builder.build(); - // let size = block_data.get_size(); - // println!( - // "Predicted size: {}: Actual size: {}", - // size, - // block_data.get_size() - // ); - } -} diff --git a/rust/worker/src/blockstore/arrow_blockfile/blockfile.rs b/rust/worker/src/blockstore/arrow_blockfile/blockfile.rs deleted file mode 100644 index e2c92009dc8..00000000000 --- a/rust/worker/src/blockstore/arrow_blockfile/blockfile.rs +++ /dev/null @@ -1,472 +0,0 @@ -use super::super::types::{Blockfile, BlockfileKey, Key, KeyType, Value, ValueType}; -use super::block::BlockState; -use super::provider::ArrowBlockProvider; -use super::sparse_index::SparseIndex; -use crate::blockstore::arrow_blockfile::block::delta::BlockDelta; -use uuid::Uuid; - -pub(super) const MAX_BLOCK_SIZE: usize = 16384; - -// TODO: Think about the clone here -#[derive(Clone)] -pub(crate) struct ArrowBlockfile { - sparse_index: SparseIndex, - key_type: KeyType, - value_type: ValueType, - transaction_state: Option, - block_provider: ArrowBlockProvider, -} - -// TODO: Remove this clone -#[derive(Clone)] -struct TransactionState { - block_delta: Vec, - new_sparse_index: Option, -} - -impl TransactionState { - fn new() -> Self { - Self { - block_delta: Vec::new(), - new_sparse_index: None, - } - } - - fn add_delta(&mut self, delta: BlockDelta) { - self.block_delta.push(delta); - } - - fn get_delta_for_block(&self, search_id: &Uuid) -> Option { - for delta in &self.block_delta { - if delta.source_block.get_id() == *search_id { - return Some(delta.clone()); - } - } - None - } -} - -impl Blockfile for ArrowBlockfile { - fn get(&self, key: BlockfileKey) -> Result> { - let target_block_id = self.sparse_index.get_target_block_id(&key); - let target_block = match self.block_provider.get_block(&target_block_id) { - None => panic!("Block not found"), // TODO: This should not panic tbh - Some(block) => block, - }; - let value = target_block.get(&key); - match value { - None => panic!("Key not found"), // TODO: This should not panic tbh - Some(value) => Ok(value), - } - } - - fn get_by_prefix( - &self, - prefix: String, - ) -> Result, Box> { - unimplemented!(); - } - - fn get_gt( - &self, - prefix: String, - key: Key, - ) -> Result, Box> { - unimplemented!(); - } - - fn get_gte( - &self, - prefix: String, - key: Key, - ) -> Result, Box> { - unimplemented!(); - } - - fn get_lt( - &self, - prefix: String, - key: Key, - ) -> Result, Box> { - unimplemented!(); - } - - fn get_lte( - &self, - prefix: String, - key: Key, - ) -> Result, Box> { - unimplemented!(); - } - - fn set( - &mut self, - key: BlockfileKey, - value: Value, - ) -> Result<(), Box> { - // TODO: value must be smaller than the block size except for position lists, which are a special case - // where we split the value across multiple blocks - if !self.in_transaction() { - panic!("Transaction not in progress"); - } - - // Validate key type - match key.key { - Key::String(_) => { - if self.key_type != KeyType::String { - panic!("Invalid key type"); - } - } - Key::Float(_) => { - if self.key_type != KeyType::Float { - panic!("Invalid key type"); - } - } - } - - // Validate value type - match value { - Value::Int32ArrayValue(_) => { - if self.value_type != ValueType::Int32Array { - panic!("Invalid value type"); - } - } - Value::StringValue(_) => { - if self.value_type != ValueType::String { - panic!("Invalid value type"); - } - } - Value::PositionalPostingListValue(_) => { - if self.value_type != ValueType::PositionalPostingList { - panic!("Invalid value type"); - } - } - Value::RoaringBitmapValue(_) => { - if self.value_type != ValueType::RoaringBitmap { - panic!("Invalid value type"); - } - } - } - - let transaction_state = match self.transaction_state { - None => panic!("Transaction not in progress"), - Some(ref mut state) => state, - }; - - let target_block_id = match transaction_state.new_sparse_index { - None => self.sparse_index.get_target_block_id(&key), - Some(ref index) => index.get_target_block_id(&key), - }; - - // for debugging - let target_block_id_string = target_block_id.to_string(); - - let delta = match transaction_state.get_delta_for_block(&target_block_id) { - None => { - println!("Creating new block delta"); - let target_block = match self.block_provider.get_block(&target_block_id) { - None => panic!("Block not found"), // TODO: This should not panic tbh - Some(block) => block, - }; - let delta = BlockDelta::from(target_block); - println!("New delta has size: {}", delta.get_size()); - transaction_state.add_delta(delta.clone()); - delta - } - Some(delta) => delta, - }; - - if delta.can_add(&key, &value) { - delta.add(key, value); - } else { - let (split_key, new_delta) = delta.split(&self.block_provider); - match transaction_state.new_sparse_index { - None => { - let mut new_sparse_index = SparseIndex::from(&self.sparse_index); - new_sparse_index.add_block(split_key, new_delta.source_block.get_id()); - transaction_state.new_sparse_index = Some(new_sparse_index); - } - Some(ref mut index) => { - index.add_block(split_key, new_delta.source_block.get_id()); - } - } - transaction_state.add_delta(new_delta); - self.set(key, value)? - } - Ok(()) - } - - fn begin_transaction(&mut self) -> Result<(), Box> { - if self.in_transaction() { - // TODO: return error - panic!("Transaction already in progress"); - } - self.transaction_state = Some(TransactionState::new()); - Ok(()) - } - - fn commit_transaction(&mut self) -> Result<(), Box> { - if !self.in_transaction() { - panic!("Transaction not in progress"); - } - - let transaction_state = match self.transaction_state { - None => panic!("Transaction not in progress"), // TODO: make error - Some(ref mut state) => state, - }; - - for delta in &transaction_state.block_delta { - // build a new block and replace the blockdata in the block - // TOOO: the data capacities need to include the offsets and padding, not just the raw data size - // let new_block_data = BlockData::from(delta); - - // TODO: thinking about an edge case here: someone could register while we are in a transaction, and then we would have to handle that - // in that case, update_data() can fail, since the block is registered, and we would have to create a new block and update the sparse index - - // Blocks are WORM, so if the block is uninitialized or initialized we can update it directly, if its registered, meaning the broader system is aware of it, - // we need to create a new block and update the sparse index to point to the new block - - match delta.source_block.get_state() { - BlockState::Uninitialized => { - delta.source_block.apply_delta(delta); - delta.source_block.commit(); - println!( - "Size of commited block in bytes: {} with len {}", - delta.source_block.get_size(), - delta.source_block.len() - ); - } - BlockState::Initialized => { - delta.source_block.apply_delta(delta); - delta.source_block.commit(); - println!( - "Size of commited block in bytes: {} with len {}", - delta.source_block.get_size(), - delta.source_block.len() - ); - } - BlockState::Commited | BlockState::Registered => { - // If the block is commited or registered, we need to create a new block and update the sparse index - let new_block = self - .block_provider - .create_block(self.key_type, self.value_type); - new_block.apply_delta(delta); - let new_min_key = match delta.get_min_key() { - None => panic!("No start key"), - Some(key) => key, - }; - match transaction_state.new_sparse_index { - None => { - let mut new_sparse_index = SparseIndex::from(&self.sparse_index); - new_sparse_index.replace_block( - delta.source_block.get_id(), - new_block.get_id(), - new_min_key, - ); - transaction_state.new_sparse_index = Some(new_sparse_index); - } - Some(ref mut index) => { - index.replace_block( - delta.source_block.get_id(), - new_block.get_id(), - new_min_key, - ); - } - } - new_block.commit(); - println!( - "Size of commited block in bytes: {} with len {}", - new_block.get_size(), - new_block.len() - ); - } - } - } - - // update the sparse index - if transaction_state.new_sparse_index.is_some() { - self.sparse_index = transaction_state.new_sparse_index.take().unwrap(); - // unwrap is safe because we just checked it - } - println!("New sparse index after commit: {:?}", self.sparse_index); - - self.transaction_state = None; - Ok(()) - } -} - -impl ArrowBlockfile { - pub(super) fn new( - key_type: KeyType, - value_type: ValueType, - block_provider: ArrowBlockProvider, - ) -> Self { - let initial_block = block_provider.create_block(key_type.clone(), value_type.clone()); - Self { - sparse_index: SparseIndex::new(initial_block.get_id()), - transaction_state: None, - block_provider, - key_type, - value_type, - } - } - - fn in_transaction(&self) -> bool { - self.transaction_state.is_some() - } -} - -#[cfg(test)] -mod tests { - use super::*; - use arrow::array::Int32Array; - - #[test] - fn test_blockfile() { - let block_provider = ArrowBlockProvider::new(); - let mut blockfile = - ArrowBlockfile::new(KeyType::String, ValueType::Int32Array, block_provider); - - blockfile.begin_transaction().unwrap(); - let key1 = BlockfileKey::new("key".to_string(), Key::String("zzzz".to_string())); - blockfile - .set( - key1.clone(), - Value::Int32ArrayValue(Int32Array::from(vec![1, 2, 3])), - ) - .unwrap(); - let key2 = BlockfileKey::new("key".to_string(), Key::String("aaaa".to_string())); - blockfile - .set( - key2, - Value::Int32ArrayValue(Int32Array::from(vec![4, 5, 6])), - ) - .unwrap(); - blockfile.commit_transaction().unwrap(); - - let value = blockfile.get(key1).unwrap(); - match value { - Value::Int32ArrayValue(array) => { - assert_eq!(array.values(), &[1, 2, 3]); - } - _ => panic!("Unexpected value type"), - } - } - - #[test] - fn test_splitting() { - let block_provider = ArrowBlockProvider::new(); - let mut blockfile = - ArrowBlockfile::new(KeyType::String, ValueType::Int32Array, block_provider); - - blockfile.begin_transaction().unwrap(); - let n = 1200; - for i in 0..n { - let string_key = format!("{:04}", i); - let key = BlockfileKey::new("key".to_string(), Key::String(string_key)); - blockfile - .set(key, Value::Int32ArrayValue(Int32Array::from(vec![i]))) - .unwrap(); - } - blockfile.commit_transaction().unwrap(); - - for i in 0..n { - let string_key = format!("{:04}", i); - let key = BlockfileKey::new("key".to_string(), Key::String(string_key)); - let res = blockfile.get(key).unwrap(); - match res { - Value::Int32ArrayValue(array) => { - assert_eq!(array.values(), &[i]); - } - _ => panic!("Unexpected value type"), - } - } - - // Sparse index should have 3 blocks - assert_eq!(blockfile.sparse_index.len(), 3); - assert!(blockfile.sparse_index.is_valid()); - - // Add 5 new entries to the first block - blockfile.begin_transaction().unwrap(); - for i in 0..5 { - let new_key = format! {"{:05}", i}; - let key = BlockfileKey::new("key".to_string(), Key::String(new_key)); - blockfile - .set(key, Value::Int32ArrayValue(Int32Array::from(vec![i]))) - .unwrap(); - } - blockfile.commit_transaction().unwrap(); - - // Sparse index should still have 3 blocks - assert_eq!(blockfile.sparse_index.len(), 3); - assert!(blockfile.sparse_index.is_valid()); - - // Add 1200 more entries, causing splits - blockfile.begin_transaction().unwrap(); - for i in n..n * 2 { - let new_key = format! {"{:04}", i}; - let key = BlockfileKey::new("key".to_string(), Key::String(new_key)); - blockfile - .set(key, Value::Int32ArrayValue(Int32Array::from(vec![i]))) - .unwrap(); - } - blockfile.commit_transaction().unwrap(); - } - - #[test] - fn test_string_value() { - let block_provider = ArrowBlockProvider::new(); - let mut blockfile = ArrowBlockfile::new(KeyType::String, ValueType::String, block_provider); - - blockfile.begin_transaction().unwrap(); - let n = 2000; - - for i in 0..n { - let string_key = format!("{:04}", i); - let key = BlockfileKey::new("key".to_string(), Key::String(string_key.clone())); - blockfile - .set(key, Value::StringValue(string_key.clone())) - .unwrap(); - } - blockfile.commit_transaction().unwrap(); - - for i in 0..n { - let string_key = format!("{:04}", i); - let key = BlockfileKey::new("key".to_string(), Key::String(string_key.clone())); - let res = blockfile.get(key).unwrap(); - match res { - Value::StringValue(string) => { - assert_eq!(string, string_key); - } - _ => panic!("Unexpected value type"), - } - } - } - - #[test] - fn test_int_key() { - let block_provider = ArrowBlockProvider::new(); - let mut blockfile = ArrowBlockfile::new(KeyType::Float, ValueType::String, block_provider); - - blockfile.begin_transaction().unwrap(); - let n = 2000; - for i in 0..n { - let key = BlockfileKey::new("key".to_string(), Key::Float(i as f32)); - blockfile - .set(key, Value::StringValue(format!("{:04}", i))) - .unwrap(); - } - blockfile.commit_transaction().unwrap(); - - for i in 0..n { - let key = BlockfileKey::new("key".to_string(), Key::Float(i as f32)); - let res = blockfile.get(key).unwrap(); - match res { - Value::StringValue(string) => { - assert_eq!(string, format!("{:04}", i)); - } - _ => panic!("Unexpected value type"), - } - } - } -} diff --git a/rust/worker/src/blockstore/arrow_blockfile/mod.rs b/rust/worker/src/blockstore/arrow_blockfile/mod.rs deleted file mode 100644 index fdff38999eb..00000000000 --- a/rust/worker/src/blockstore/arrow_blockfile/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -mod block; -mod blockfile; -mod provider; -mod sparse_index; diff --git a/rust/worker/src/blockstore/arrow_blockfile/provider.rs b/rust/worker/src/blockstore/arrow_blockfile/provider.rs deleted file mode 100644 index 2af0a55d078..00000000000 --- a/rust/worker/src/blockstore/arrow_blockfile/provider.rs +++ /dev/null @@ -1,84 +0,0 @@ -use std::collections::HashMap; -use std::sync::Arc; - -use parking_lot::RwLock; -use uuid::Uuid; - -use super::super::provider::BlockfileProvider; -use crate::blockstore::arrow_blockfile::block::Block; -use crate::blockstore::arrow_blockfile::blockfile::ArrowBlockfile; -use crate::blockstore::provider::{CreateError, OpenError}; -use crate::blockstore::types::{Blockfile, KeyType, ValueType}; - -pub(super) struct ArrowBlockfileProvider { - block_provider: ArrowBlockProvider, - files: HashMap>, -} - -impl BlockfileProvider for ArrowBlockfileProvider { - fn new() -> Self { - Self { - block_provider: ArrowBlockProvider::new(), - files: HashMap::new(), - } - } - - fn open(&self, path: &str) -> Result, Box> { - match self.files.get(path) { - Some(file) => Ok(file.clone()), - None => Err(Box::new(OpenError::NotFound)), - } - } - - fn create( - &mut self, - path: &str, - key_type: KeyType, - value_type: ValueType, - ) -> Result, Box> { - match self.files.get(path) { - Some(_) => Err(Box::new(CreateError::AlreadyExists)), - None => { - let blockfile = Box::new(ArrowBlockfile::new( - key_type, - value_type, - self.block_provider.clone(), - )); - self.files.insert(path.to_string(), blockfile); - Ok(self.files.get(path).unwrap().clone()) - } - } - } -} - -struct ArrowBlockProviderInner { - blocks: HashMap>, -} - -#[derive(Clone)] -pub(super) struct ArrowBlockProvider { - inner: Arc>, -} - -impl ArrowBlockProvider { - pub(super) fn new() -> Self { - Self { - inner: Arc::new(RwLock::new(ArrowBlockProviderInner { - blocks: HashMap::new(), - })), - } - } - - pub(super) fn create_block(&self, key_type: KeyType, value_type: ValueType) -> Arc { - let block = Arc::new(Block::new(Uuid::new_v4(), key_type, value_type)); - self.inner - .write() - .blocks - .insert(block.get_id(), block.clone()); - block - } - - pub(super) fn get_block(&self, id: &Uuid) -> Option> { - self.inner.read().blocks.get(id).cloned() - } -} diff --git a/rust/worker/src/blockstore/arrow_blockfile/sparse_index.rs b/rust/worker/src/blockstore/arrow_blockfile/sparse_index.rs deleted file mode 100644 index e1d4f604885..00000000000 --- a/rust/worker/src/blockstore/arrow_blockfile/sparse_index.rs +++ /dev/null @@ -1,196 +0,0 @@ -use crate::blockstore::types::Key; -use crate::blockstore::types::{Blockfile, BlockfileKey}; -use std::collections::{BTreeMap, HashMap}; -use std::fmt::Debug; -use uuid::Uuid; - -// A sentinel blockfilekey wrapper to represent the start blocks range -#[derive(Clone, Debug)] -pub(super) enum SparseIndexDelimiter { - Start, - Key(BlockfileKey), -} - -impl PartialEq for SparseIndexDelimiter { - fn eq(&self, other: &Self) -> bool { - match (self, other) { - (SparseIndexDelimiter::Start, SparseIndexDelimiter::Start) => true, - (SparseIndexDelimiter::Key(k1), SparseIndexDelimiter::Key(k2)) => k1 == k2, - _ => false, - } - } -} - -impl Eq for SparseIndexDelimiter {} - -impl PartialOrd for SparseIndexDelimiter { - fn partial_cmp(&self, other: &Self) -> Option { - match (self, other) { - (SparseIndexDelimiter::Start, SparseIndexDelimiter::Start) => { - Some(std::cmp::Ordering::Equal) - } - (SparseIndexDelimiter::Start, SparseIndexDelimiter::Key(_)) => { - Some(std::cmp::Ordering::Less) - } - (SparseIndexDelimiter::Key(_), SparseIndexDelimiter::Start) => { - Some(std::cmp::Ordering::Greater) - } - (SparseIndexDelimiter::Key(k1), SparseIndexDelimiter::Key(k2)) => k1.partial_cmp(k2), - } - } -} - -impl Ord for SparseIndexDelimiter { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - match (self, other) { - (SparseIndexDelimiter::Start, SparseIndexDelimiter::Start) => std::cmp::Ordering::Equal, - (SparseIndexDelimiter::Start, SparseIndexDelimiter::Key(_)) => std::cmp::Ordering::Less, - (SparseIndexDelimiter::Key(_), SparseIndexDelimiter::Start) => { - std::cmp::Ordering::Greater - } - (SparseIndexDelimiter::Key(k1), SparseIndexDelimiter::Key(k2)) => k1.cmp(k2), - } - } -} - -// TODO: remove clone here -#[derive(Clone)] -pub(super) struct SparseIndex { - forward: BTreeMap, - reverse: HashMap, -} - -impl SparseIndex { - pub(super) fn new(initial_block_id: Uuid) -> Self { - let mut forward = BTreeMap::new(); - forward.insert(SparseIndexDelimiter::Start, initial_block_id); - let mut reverse = HashMap::new(); - reverse.insert(initial_block_id, SparseIndexDelimiter::Start); - Self { forward, reverse } - } - - pub(super) fn from(old_sparse_index: &SparseIndex) -> Self { - Self { - forward: old_sparse_index.forward.clone(), - reverse: old_sparse_index.reverse.clone(), - } - } - - pub(super) fn get_target_block_id(&self, search_key: &BlockfileKey) -> Uuid { - let mut iter_curr = self.forward.iter(); - let mut iter_next = self.forward.iter().skip(1); - let search_key = SparseIndexDelimiter::Key(search_key.clone()); - while let Some((curr_key, curr_block_id)) = iter_curr.next() { - if let Some((next_key, _)) = iter_next.next() { - if search_key >= *curr_key && search_key < *next_key { - return *curr_block_id; - } - } else { - return *curr_block_id; - } - } - panic!("No blocks in the sparse index"); - } - - pub(super) fn add_block(&mut self, start_key: BlockfileKey, block_id: Uuid) { - self.forward - .insert(SparseIndexDelimiter::Key(start_key.clone()), block_id); - self.reverse - .insert(block_id, SparseIndexDelimiter::Key(start_key)); - } - - pub(super) fn replace_block( - &mut self, - old_block_id: Uuid, - new_block_id: Uuid, - new_start_key: BlockfileKey, - ) { - if let Some(old_start_key) = self.reverse.remove(&old_block_id) { - self.forward.remove(&old_start_key); - if old_start_key == SparseIndexDelimiter::Start { - self.forward - .insert(SparseIndexDelimiter::Start, new_block_id); - } else { - self.forward - .insert(SparseIndexDelimiter::Key(new_start_key), new_block_id); - } - } - } - - pub(super) fn len(&self) -> usize { - self.forward.len() - } - - pub(super) fn is_valid(&self) -> bool { - let mut first = true; - let mut iter_curr = self.forward.iter(); - let mut iter_next = self.forward.iter().skip(1); - while let Some((curr_key, _)) = iter_curr.next() { - if first { - if curr_key != &SparseIndexDelimiter::Start { - return false; - } - first = false; - } - if let Some((next_key, _)) = iter_next.next() { - if curr_key >= next_key { - return false; - } - } - } - true - } -} - -impl Debug for SparseIndex { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "SparseIndex {{")?; - for (k, v) in self.forward.iter() { - write!(f, "\n {:?} -> {:?}", k, v)?; - } - write!(f, "\n}}") - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::blockstore::types::Key; - - #[test] - fn test_sparse_index() { - let mut block_id_1 = uuid::Uuid::new_v4(); - let mut sparse_index = SparseIndex::new(block_id_1); - let mut blockfile_key = - BlockfileKey::new("prefix".to_string(), Key::String("a".to_string())); - sparse_index.add_block(blockfile_key.clone(), block_id_1); - assert_eq!(sparse_index.get_target_block_id(&blockfile_key), block_id_1); - - blockfile_key = BlockfileKey::new("prefix".to_string(), Key::String("b".to_string())); - assert_eq!(sparse_index.get_target_block_id(&blockfile_key), block_id_1); - - // Split the range into two blocks (start, c), and (c, end) - let block_id_2 = uuid::Uuid::new_v4(); - blockfile_key = BlockfileKey::new("prefix".to_string(), Key::String("c".to_string())); - sparse_index.add_block(blockfile_key.clone(), block_id_2); - assert_eq!(sparse_index.get_target_block_id(&blockfile_key), block_id_2); - - // d should fall into the second block - blockfile_key = BlockfileKey::new("prefix".to_string(), Key::String("d".to_string())); - assert_eq!(sparse_index.get_target_block_id(&blockfile_key), block_id_2); - - // Split the second block into (c, f) and (f, end) - let block_id_3 = uuid::Uuid::new_v4(); - blockfile_key = BlockfileKey::new("prefix".to_string(), Key::String("f".to_string())); - sparse_index.add_block(blockfile_key.clone(), block_id_3); - assert_eq!(sparse_index.get_target_block_id(&blockfile_key), block_id_3); - - // g should fall into the third block - blockfile_key = BlockfileKey::new("prefix".to_string(), Key::String("g".to_string())); - assert_eq!(sparse_index.get_target_block_id(&blockfile_key), block_id_3); - - // b should fall into the first block - blockfile_key = BlockfileKey::new("prefix".to_string(), Key::String("b".to_string())); - assert_eq!(sparse_index.get_target_block_id(&blockfile_key), block_id_1); - } -} diff --git a/rust/worker/src/blockstore/mod.rs b/rust/worker/src/blockstore/mod.rs index 4facc35f85d..7a9d5d38b7e 100644 --- a/rust/worker/src/blockstore/mod.rs +++ b/rust/worker/src/blockstore/mod.rs @@ -1,4 +1,3 @@ -mod arrow_blockfile; mod positional_posting_list_value; mod types;