Skip to content

Commit

Permalink
pr review
Browse files Browse the repository at this point in the history
  • Loading branch information
HammadB committed Mar 13, 2024
1 parent c2b38f9 commit b3a0456
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 6 deletions.
26 changes: 21 additions & 5 deletions rust/worker/src/blockstore/arrow_blockfile/blockfile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ use uuid::Uuid;

pub(super) const MAX_BLOCK_SIZE: usize = 16384;

/// ArrowBlockfile is a blockfile implementation that uses Apache Arrow for the block storage.
/// It stores a sparse index over a set of blocks sorted by key.
/// It uses a block provider to create new blocks and to retrieve existing blocks.
#[derive(Clone)]
pub(crate) struct ArrowBlockfile {
key_type: KeyType,
Expand All @@ -21,24 +24,31 @@ pub(crate) struct ArrowBlockfile {
transaction_state: Option<Arc<TransactionState>>,
}

/// TransactionState is a helper struct to keep track of the state of a transaction.
/// It keeps a list of block deltas that are applied during the transaction and the new
/// sparse index that is created during the transaction. The sparse index is immutable
/// so we can replace the sparse index of the blockfile with the new sparse index after
/// the transaction is committed.
struct TransactionState {
block_delta: Mutex<Vec<BlockDelta>>,
new_sparse_index: Mutex<Option<Arc<Mutex<SparseIndex>>>>,
sparse_index: Mutex<Option<Arc<Mutex<SparseIndex>>>>,
}

impl TransactionState {
fn new() -> Self {
Self {
block_delta: Mutex::new(Vec::new()),
new_sparse_index: Mutex::new(None),
sparse_index: Mutex::new(None),
}
}

/// Add a new block delta to the transaction state
fn add_delta(&self, delta: BlockDelta) {
let mut block_delta = self.block_delta.lock();
block_delta.push(delta);
}

/// Get the block delta for a specific block id
fn get_delta_for_block(&self, search_id: &Uuid) -> Option<BlockDelta> {
let block_delta = self.block_delta.lock();
for delta in &*block_delta {
Expand Down Expand Up @@ -187,12 +197,15 @@ impl Blockfile for ArrowBlockfile {
Some(transaction_state) => transaction_state,
};

let mut transaction_sparse_index = transaction_state.new_sparse_index.lock();
// Get the target block id for the key
let mut transaction_sparse_index = transaction_state.sparse_index.lock();
let target_block_id = match *transaction_sparse_index {
None => self.sparse_index.lock().get_target_block_id(&key),
Some(ref index) => index.lock().get_target_block_id(&key),
};

// See if a delta for the target block already exists, if not create a new one and add it to the transaction state
// Creating a delta loads the block entirely into memory
let delta = match transaction_state.get_delta_for_block(&target_block_id) {
None => {
let target_block = match self.block_provider.get_block(&target_block_id) {
Expand All @@ -206,6 +219,8 @@ impl Blockfile for ArrowBlockfile {
Some(delta) => delta,
};

// Check if we can add to the the delta without pushing the block over the max size.
// If we can't, we need to split the block and create a new delta
if delta.can_add(&key, &value) {
delta.add(key, value);
} else {
Expand All @@ -227,6 +242,7 @@ impl Blockfile for ArrowBlockfile {
}
transaction_state.add_delta(new_delta);
drop(transaction_sparse_index);
// Recursive call to add to the new appropriate delta
self.set(key, value)?
}
Ok(())
Expand Down Expand Up @@ -299,7 +315,7 @@ impl Blockfile for ArrowBlockfile {
None => return Err(Box::new(ArrowBlockfileError::NoSplitKeyFound)),
Some(key) => key,
};
let mut transaction_sparse_index = transaction_state.new_sparse_index.lock();
let mut transaction_sparse_index = transaction_state.sparse_index.lock();
match *transaction_sparse_index {
None => {
let new_sparse_index =
Expand Down Expand Up @@ -330,7 +346,7 @@ impl Blockfile for ArrowBlockfile {
}

// update the sparse index
let mut transaction_state_sparse_index = transaction_state.new_sparse_index.lock();
let mut transaction_state_sparse_index = transaction_state.sparse_index.lock();
if transaction_state_sparse_index.is_some() {
self.sparse_index = transaction_state_sparse_index.take().unwrap();
// unwrap is safe because we just checked it
Expand Down
1 change: 0 additions & 1 deletion rust/worker/src/blockstore/arrow_blockfile/sparse_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ impl Ord for SparseIndexDelimiter {
/// - `replace_block` - Replace an existing block with a new one
/// - `len` - Get the number of blocks in the sparse index
/// - `is_valid` - Check if the sparse index is valid, useful for debugging and testing
#[derive(Clone)] // TODO: remove this clone
pub(super) struct SparseIndex {
forward: BTreeMap<SparseIndexDelimiter, Uuid>,
reverse: HashMap<Uuid, SparseIndexDelimiter>,
Expand Down

0 comments on commit b3a0456

Please sign in to comment.