Skip to content

Commit

Permalink
[ENH] Add Arrow-backed blockfile
Browse files Browse the repository at this point in the history
  • Loading branch information
HammadB committed Mar 9, 2024
1 parent 8df4656 commit 6df7055
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 58 deletions.
143 changes: 86 additions & 57 deletions rust/worker/src/blockstore/arrow_blockfile/blockfile.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use super::super::types::{Blockfile, BlockfileKey, Key, KeyType, Value, ValueType};
use super::block::BlockState;
use super::block::{BlockError, BlockState};
use super::provider::ArrowBlockProvider;
use super::sparse_index::SparseIndex;
use crate::blockstore::arrow_blockfile::block::delta::BlockDelta;
use crate::blockstore::BlockfileError;
use crate::errors::ChromaError;
use parking_lot::Mutex;
use std::sync::Arc;
use thiserror::Error;
use uuid::Uuid;

pub(super) const MAX_BLOCK_SIZE: usize = 16384;

// TODO: Think about the clone here
#[derive(Clone)]
pub(crate) struct ArrowBlockfile {
key_type: KeyType,
Expand Down Expand Up @@ -48,16 +50,36 @@ impl TransactionState {
}
}

#[derive(Error, Debug)]
pub(crate) enum ArrowBlockfileError {
#[error("Block not found")]
BlockNotFoundError,
#[error("Block Error")]
BlockError(#[from] BlockError),
#[error("No split key found")]
NoSplitKeyFound,
}

impl ChromaError for ArrowBlockfileError {
fn code(&self) -> crate::errors::ErrorCodes {
match self {
ArrowBlockfileError::BlockNotFoundError => crate::errors::ErrorCodes::NotFound,
ArrowBlockfileError::BlockError(err) => err.code(),
ArrowBlockfileError::NoSplitKeyFound => crate::errors::ErrorCodes::Internal,
}
}
}

impl Blockfile for ArrowBlockfile {
fn get(&self, key: BlockfileKey) -> Result<Value, Box<dyn crate::errors::ChromaError>> {
fn get(&self, key: BlockfileKey) -> Result<Value, Box<dyn ChromaError>> {
let target_block_id = self.sparse_index.lock().get_target_block_id(&key);
let target_block = match self.block_provider.get_block(&target_block_id) {
None => panic!("Block not found"), // TODO: This should not panic tbh
None => return Err(Box::new(ArrowBlockfileError::BlockNotFoundError)),
Some(block) => block,
};
let value = target_block.get(&key);
match value {
None => panic!("Key not found"), // TODO: This should not panic tbh
None => return Err(Box::new(BlockfileError::NotFoundError)),
Some(value) => Ok(value),
}
}
Expand Down Expand Up @@ -109,24 +131,24 @@ impl Blockfile for ArrowBlockfile {
// TODO: value must be smaller than the block size except for position lists, which are a special case
// where we split the value across multiple blocks
if !self.in_transaction() {
panic!("Transaction not in progress");
return Err(Box::new(BlockfileError::TransactionNotInProgress));
}

// Validate key type
match key.key {
Key::String(_) => {
if self.key_type != KeyType::String {
panic!("Invalid key type");
return Err(Box::new(BlockfileError::InvalidKeyType));
}
}
Key::Float(_) => {
if self.key_type != KeyType::Float {
panic!("Invalid key type");
return Err(Box::new(BlockfileError::InvalidKeyType));
}
}
Key::Bool(_) => {
if self.key_type != KeyType::Bool {
panic!("Invalid key type");
return Err(Box::new(BlockfileError::InvalidKeyType));
}
}
}
Expand All @@ -135,33 +157,33 @@ impl Blockfile for ArrowBlockfile {
match value {
Value::Int32ArrayValue(_) => {
if self.value_type != ValueType::Int32Array {
panic!("Invalid value type");
return Err(Box::new(BlockfileError::InvalidValueType));
}
}
Value::StringValue(_) => {
if self.value_type != ValueType::String {
panic!("Invalid value type");
return Err(Box::new(BlockfileError::InvalidValueType));
}
}
Value::Int32Value(_) => {
if self.value_type != ValueType::Int32 {
panic!("Invalid value type");
return Err(Box::new(BlockfileError::InvalidValueType));
}
}
Value::PositionalPostingListValue(_) => {
if self.value_type != ValueType::PositionalPostingList {
panic!("Invalid value type");
return Err(Box::new(BlockfileError::InvalidValueType));
}
}
Value::RoaringBitmapValue(_) => {
if self.value_type != ValueType::RoaringBitmap {
panic!("Invalid value type");
return Err(Box::new(BlockfileError::InvalidValueType));
}
}
}

let transaction_state = match &self.transaction_state {
None => panic!("Transaction not in progress"),
None => return Err(Box::new(BlockfileError::TransactionNotInProgress)),
Some(transaction_state) => transaction_state,
};

Expand All @@ -174,7 +196,7 @@ impl Blockfile for ArrowBlockfile {
let delta = match transaction_state.get_delta_for_block(&target_block_id) {
None => {
let target_block = match self.block_provider.get_block(&target_block_id) {
None => panic!("Block not found"), // TODO: This should not panic tbh
None => return Err(Box::new(ArrowBlockfileError::BlockNotFoundError)),
Some(block) => block,
};
let delta = BlockDelta::from(target_block);
Expand All @@ -190,7 +212,7 @@ impl Blockfile for ArrowBlockfile {
let (split_key, new_delta) = delta.split(&self.block_provider);
match *transaction_sparse_index {
None => {
let mut new_sparse_index =
let new_sparse_index =
Arc::new(Mutex::new(SparseIndex::from(&self.sparse_index.lock())));
new_sparse_index
.lock()
Expand All @@ -212,20 +234,19 @@ impl Blockfile for ArrowBlockfile {

fn begin_transaction(&mut self) -> Result<(), Box<dyn crate::errors::ChromaError>> {
if self.in_transaction() {
// TODO: return error
panic!("Transaction already in progress");
return Err(Box::new(BlockfileError::TransactionInProgress));
}
self.transaction_state = Some(Arc::new(TransactionState::new()));
Ok(())
}

fn commit_transaction(&mut self) -> Result<(), Box<dyn crate::errors::ChromaError>> {
if !self.in_transaction() {
panic!("Transaction not in progress");
return Err(Box::new(BlockfileError::TransactionNotInProgress));
}

let mut transaction_state = match self.transaction_state {
None => panic!("Transaction not in progress"),
let transaction_state = match self.transaction_state {
None => return Err(Box::new(BlockfileError::TransactionNotInProgress)),
Some(ref transaction_state) => transaction_state,
};

Expand All @@ -235,37 +256,53 @@ impl Blockfile for ArrowBlockfile {

match delta.source_block.get_state() {
BlockState::Uninitialized => {
delta.source_block.apply_delta(&delta);
delta.source_block.commit();
println!(
"Size of commited block in bytes: {} with len {}",
delta.source_block.get_size(),
delta.source_block.len()
);
match delta.source_block.apply_delta(&delta) {
Ok(_) => {}
Err(err) => {
return Err(Box::new(ArrowBlockfileError::BlockError(*err)));
}
}
match delta.source_block.commit() {
Ok(_) => {}
Err(err) => {
return Err(Box::new(ArrowBlockfileError::BlockError(*err)));
}
}
}
BlockState::Initialized => {
delta.source_block.apply_delta(&delta);
delta.source_block.commit();
println!(
"Size of commited block in bytes: {} with len {}",
delta.source_block.get_size(),
delta.source_block.len()
);
match delta.source_block.apply_delta(&delta) {
Ok(_) => {}
Err(err) => {
return Err(Box::new(ArrowBlockfileError::BlockError(*err)));
}
}
match delta.source_block.commit() {
Ok(_) => {}
Err(err) => {
return Err(Box::new(ArrowBlockfileError::BlockError(*err)));
}
}
}
BlockState::Commited | BlockState::Registered => {
// If the block is commited or registered, we need to create a new block and update the sparse index
let new_block = self
.block_provider
.create_block(self.key_type, self.value_type);
new_block.apply_delta(&delta);
match new_block.apply_delta(&delta) {
Ok(_) => {}
Err(err) => {
return Err(Box::new(ArrowBlockfileError::BlockError(*err)));
}
}
let new_min_key = match delta.get_min_key() {
None => panic!("No start key"),
// This should never happen. We don't panic here because we want to return a proper error
None => return Err(Box::new(ArrowBlockfileError::NoSplitKeyFound)),
Some(key) => key,
};
let mut transaction_sparse_index = transaction_state.new_sparse_index.lock();
match *transaction_sparse_index {
None => {
let mut new_sparse_index =
let new_sparse_index =
Arc::new(Mutex::new(SparseIndex::from(&self.sparse_index.lock())));
new_sparse_index.lock().replace_block(
delta.source_block.get_id(),
Expand All @@ -282,12 +319,12 @@ impl Blockfile for ArrowBlockfile {
);
}
}
new_block.commit();
println!(
"Size of commited block in bytes: {} with len {}",
new_block.get_size(),
new_block.len()
);
match new_block.commit() {
Ok(_) => {}
Err(err) => {
return Err(Box::new(ArrowBlockfileError::BlockError(*err)));
}
}
}
}
}
Expand Down Expand Up @@ -502,24 +539,16 @@ mod tests {
}
blockfile.commit_transaction().unwrap();

// Print size of each block
for block_id in blockfile.sparse_index.lock().block_ids() {
let block = blockfile.block_provider.get_block(block_id).unwrap();
println!(
"Size of block {} in bytes: {} with len {}",
block_id,
block.get_size(),
block.len()
);
}

for i in 0..n {
let key = BlockfileKey::new("key".to_string(), Key::String(format!("{:04}", i)));
let res = blockfile.get(key).unwrap();
match res {
Value::RoaringBitmapValue(bitmap) => {
assert_eq!(bitmap.len(), i as u64);
// TODO: check it contains the right values
assert_eq!(
bitmap.iter().collect::<Vec<u32>>(),
(0..i).collect::<Vec<u32>>()
);
}
_ => panic!("Unexpected value type"),
}
Expand Down
15 changes: 14 additions & 1 deletion rust/worker/src/blockstore/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,25 @@ use thiserror::Error;
pub(crate) enum BlockfileError {
#[error("Key not found")]
NotFoundError,
#[error("Invalid Key Type")]
InvalidKeyType,
#[error("Invalid Value Type")]
InvalidValueType,
#[error("Transaction already in progress")]
TransactionInProgress,
#[error("Transaction not in progress")]
TransactionNotInProgress,
}

impl ChromaError for BlockfileError {
fn code(&self) -> ErrorCodes {
match self {
BlockfileError::NotFoundError => ErrorCodes::InvalidArgument,
BlockfileError::NotFoundError
| BlockfileError::InvalidKeyType
| BlockfileError::InvalidValueType => ErrorCodes::InvalidArgument,
BlockfileError::TransactionInProgress | BlockfileError::TransactionNotInProgress => {
ErrorCodes::FailedPrecondition
}
}
}
}
Expand Down

0 comments on commit 6df7055

Please sign in to comment.