Skip to content

Commit

Permalink
[ENH] Add block delta
Browse files Browse the repository at this point in the history
  • Loading branch information
HammadB committed Mar 7, 2024
1 parent d344838 commit 401f43a
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 9 deletions.
Empty file.
15 changes: 8 additions & 7 deletions rust/worker/src/blockstore/arrow_blockfile/block/delta.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use super::{Block, BlockBuilderOptions, BlockData, BlockDataBuilder};
use crate::blockstore::{
arrow_blockfile::{blockfile::MAX_BLOCK_SIZE, provider::ArrowBlockProvider},
types::{BlockfileKey, KeyType, Value, ValueType},
Expand All @@ -6,8 +7,6 @@ use arrow::util::bit_util;
use parking_lot::RwLock;
use std::{collections::BTreeMap, sync::Arc};

use super::{Block, BlockBuilderOptions, BlockData, BlockDataBuilder};

#[derive(Clone)]
pub struct BlockDelta {
pub source_block: Arc<Block>,
Expand Down Expand Up @@ -247,8 +246,10 @@ impl BlockDeltaInner {
}
}

impl From<&BlockDelta> for BlockData {
fn from(delta: &BlockDelta) -> Self {
impl TryFrom<&BlockDelta> for BlockData {
type Error = super::BlockDataBuildError;

fn try_from(delta: &BlockDelta) -> Result<BlockData, Self::Error> {
let mut builder = BlockDataBuilder::new(
delta.source_block.get_key_type(),
delta.source_block.get_value_type(),
Expand Down Expand Up @@ -311,7 +312,7 @@ mod test {
}

let size = delta.get_size();
let block_data = BlockData::from(&delta);
let block_data = BlockData::try_from(&delta).unwrap();
assert_eq!(size, block_data.get_size());
}

Expand All @@ -328,7 +329,7 @@ mod test {
delta.add(key, value);
}
let size = delta.get_size();
let block_data = BlockData::from(&delta);
let block_data = BlockData::try_from(&delta).unwrap();
assert_eq!(size, block_data.get_size());
}

Expand All @@ -346,7 +347,7 @@ mod test {
}

let size = delta.get_size();
let block_data = BlockData::from(&delta);
let block_data = BlockData::try_from(&delta).unwrap();
assert_eq!(size, block_data.get_size());
}
}
2 changes: 0 additions & 2 deletions rust/worker/src/blockstore/arrow_blockfile/block/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
mod blockfile;
mod delta;
mod iterator;
mod provider;
mod types;

// Re-export types at the arrow_blockfile module level
Expand Down
Empty file.
27 changes: 27 additions & 0 deletions rust/worker/src/blockstore/arrow_blockfile/block/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::sync::Arc;
use thiserror::Error;
use uuid::Uuid;

use super::delta::BlockDelta;
use super::iterator::BlockIterator;

/// BlockState represents the state of a block in the blockstore. Conceptually, a block is immutable once the broarder system
Expand Down Expand Up @@ -58,12 +59,15 @@ pub struct Block {
pub enum BlockError {
#[error("Invalid state transition")]
InvalidStateTransition,
#[error("Block data error")]
BlockDataError(#[from] BlockDataBuildError),
}

impl ChromaError for BlockError {
fn code(&self) -> ErrorCodes {
match self {
BlockError::InvalidStateTransition => ErrorCodes::Internal,
BlockError::BlockDataError(e) => e.code(),
}
}
}
Expand Down Expand Up @@ -202,6 +206,29 @@ impl Block {
}
}

pub fn apply_delta(&self, delta: &BlockDelta) -> Result<(), Box<BlockError>> {
let data = match BlockData::try_from(delta) {
Ok(data) => data,
Err(e) => return Err(Box::new(BlockError::BlockDataError(e))),
};
let mut inner = self.inner.write();
match inner.state {
BlockState::Uninitialized => {
inner.data = Some(data);
inner.state = BlockState::Initialized;
Ok(())
}
BlockState::Initialized => {
inner.data = Some(data);
inner.state = BlockState::Initialized;
Ok(())
}
BlockState::Commited | BlockState::Registered => {
Err(Box::new(BlockError::InvalidStateTransition))
}
}
}

pub(super) fn iter(&self) -> BlockIterator {
BlockIterator::new(
self.clone(),
Expand Down
1 change: 1 addition & 0 deletions rust/worker/src/blockstore/arrow_blockfile/blockfile.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub(super) const MAX_BLOCK_SIZE: usize = 16384;
2 changes: 2 additions & 0 deletions rust/worker/src/blockstore/arrow_blockfile/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
mod block;
mod blockfile;
mod provider;
37 changes: 37 additions & 0 deletions rust/worker/src/blockstore/arrow_blockfile/provider.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use super::block::Block;
use crate::blockstore::{KeyType, ValueType};
use parking_lot::RwLock;
use std::{collections::HashMap, sync::Arc};
use uuid::Uuid;

struct ArrowBlockProviderInner {
blocks: HashMap<Uuid, Arc<Block>>,
}

#[derive(Clone)]
pub(super) struct ArrowBlockProvider {
inner: Arc<RwLock<ArrowBlockProviderInner>>,
}

impl ArrowBlockProvider {
pub(super) fn new() -> Self {
Self {
inner: Arc::new(RwLock::new(ArrowBlockProviderInner {
blocks: HashMap::new(),
})),
}
}

pub(super) fn create_block(&self, key_type: KeyType, value_type: ValueType) -> Arc<Block> {
let block = Arc::new(Block::new(Uuid::new_v4(), key_type, value_type));
self.inner
.write()
.blocks
.insert(block.get_id(), block.clone());
block
}

pub(super) fn get_block(&self, id: &Uuid) -> Option<Arc<Block>> {
self.inner.read().blocks.get(id).cloned()
}
}

0 comments on commit 401f43a

Please sign in to comment.