diff --git a/rust/worker/src/blockstore/arrow_blockfile/blockfile.rs b/rust/worker/src/blockstore/arrow_blockfile/blockfile.rs index 006ef83fcb8..6a5c2fc296d 100644 --- a/rust/worker/src/blockstore/arrow_blockfile/blockfile.rs +++ b/rust/worker/src/blockstore/arrow_blockfile/blockfile.rs @@ -12,6 +12,9 @@ use uuid::Uuid; pub(super) const MAX_BLOCK_SIZE: usize = 16384; +/// ArrowBlockfile is a blockfile implementation that uses Apache Arrow for the block storage. +/// It stores a sparse index over a set of blocks sorted by key. +/// It uses a block provider to create new blocks and to retrieve existing blocks. #[derive(Clone)] pub(crate) struct ArrowBlockfile { key_type: KeyType, @@ -21,24 +24,31 @@ pub(crate) struct ArrowBlockfile { transaction_state: Option>, } +/// TransactionState is a helper struct to keep track of the state of a transaction. +/// It keeps a list of block deltas that are applied during the transaction and the new +/// sparse index that is created during the transaction. The sparse index is immutable +/// so we can replace the sparse index of the blockfile with the new sparse index after +/// the transaction is committed. struct TransactionState { block_delta: Mutex>, - new_sparse_index: Mutex>>>, + sparse_index: Mutex>>>, } impl TransactionState { fn new() -> Self { Self { block_delta: Mutex::new(Vec::new()), - new_sparse_index: Mutex::new(None), + sparse_index: Mutex::new(None), } } + /// Add a new block delta to the transaction state fn add_delta(&self, delta: BlockDelta) { let mut block_delta = self.block_delta.lock(); block_delta.push(delta); } + /// Get the block delta for a specific block id fn get_delta_for_block(&self, search_id: &Uuid) -> Option { let block_delta = self.block_delta.lock(); for delta in &*block_delta { @@ -187,12 +197,15 @@ impl Blockfile for ArrowBlockfile { Some(transaction_state) => transaction_state, }; - let mut transaction_sparse_index = transaction_state.new_sparse_index.lock(); + // Get the target block id for the key + let mut transaction_sparse_index = transaction_state.sparse_index.lock(); let target_block_id = match *transaction_sparse_index { None => self.sparse_index.lock().get_target_block_id(&key), Some(ref index) => index.lock().get_target_block_id(&key), }; + // See if a delta for the target block already exists, if not create a new one and add it to the transaction state + // Creating a delta loads the block entirely into memory let delta = match transaction_state.get_delta_for_block(&target_block_id) { None => { let target_block = match self.block_provider.get_block(&target_block_id) { @@ -206,6 +219,8 @@ impl Blockfile for ArrowBlockfile { Some(delta) => delta, }; + // Check if we can add to the the delta without pushing the block over the max size. + // If we can't, we need to split the block and create a new delta if delta.can_add(&key, &value) { delta.add(key, value); } else { @@ -227,6 +242,7 @@ impl Blockfile for ArrowBlockfile { } transaction_state.add_delta(new_delta); drop(transaction_sparse_index); + // Recursive call to add to the new appropriate delta self.set(key, value)? } Ok(()) @@ -299,7 +315,7 @@ impl Blockfile for ArrowBlockfile { None => return Err(Box::new(ArrowBlockfileError::NoSplitKeyFound)), Some(key) => key, }; - let mut transaction_sparse_index = transaction_state.new_sparse_index.lock(); + let mut transaction_sparse_index = transaction_state.sparse_index.lock(); match *transaction_sparse_index { None => { let new_sparse_index = @@ -330,7 +346,7 @@ impl Blockfile for ArrowBlockfile { } // update the sparse index - let mut transaction_state_sparse_index = transaction_state.new_sparse_index.lock(); + let mut transaction_state_sparse_index = transaction_state.sparse_index.lock(); if transaction_state_sparse_index.is_some() { self.sparse_index = transaction_state_sparse_index.take().unwrap(); // unwrap is safe because we just checked it diff --git a/rust/worker/src/blockstore/arrow_blockfile/sparse_index.rs b/rust/worker/src/blockstore/arrow_blockfile/sparse_index.rs index 8c2a423e107..134252614ec 100644 --- a/rust/worker/src/blockstore/arrow_blockfile/sparse_index.rs +++ b/rust/worker/src/blockstore/arrow_blockfile/sparse_index.rs @@ -65,7 +65,6 @@ impl Ord for SparseIndexDelimiter { /// - `replace_block` - Replace an existing block with a new one /// - `len` - Get the number of blocks in the sparse index /// - `is_valid` - Check if the sparse index is valid, useful for debugging and testing -#[derive(Clone)] // TODO: remove this clone pub(super) struct SparseIndex { forward: BTreeMap, reverse: HashMap,