Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Ishiihara committed Jul 16, 2024
1 parent 05543fb commit 3bcf445
Show file tree
Hide file tree
Showing 12 changed files with 48 additions and 53 deletions.
25 changes: 9 additions & 16 deletions rust/worker/src/blockstore/arrow/block/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,7 @@ mod test {
let read = block.get::<&str, Int32Array>("prefix", &key).unwrap();
values_before_flush.push(read);
}
let blocks = vec![block.clone()];
block_manager.flush(&blocks).await.unwrap();
block_manager.flush(&block).await.unwrap();
let block = block_manager.get(&block.clone().id).await.unwrap();
for i in 0..n {
let key = format!("key{}", i);
Expand Down Expand Up @@ -292,12 +291,11 @@ mod test {
let read = block.get::<&str, &str>("prefix", &key);
values_before_flush.push(read.unwrap().to_string());
}
let blocks = vec![block.clone()];
block_manager.flush(&blocks).await.unwrap();
block_manager.flush(&block).await.unwrap();

let block = block_manager.get(&delta_id).await.unwrap();
// TODO: enable this assertion after the sizing is fixed
// assert_eq!(size, block.get_size());
assert_eq!(size, block.get_size());
for i in 0..n {
let key = format!("key{}", i);
let read = block.get::<&str, &str>("prefix", &key);
Expand All @@ -316,8 +314,7 @@ mod test {
let forked_block = block_manager.fork::<&str, &str>(&delta_id).await;
let new_id = forked_block.id.clone();
let block = block_manager.commit::<&str, &str>(&forked_block);
let blocks = vec![block.clone()];
block_manager.flush(&blocks).await.unwrap();
block_manager.flush(&block).await.unwrap();
let forked_block = block_manager.get(&new_id).await.unwrap();
for i in 0..n {
let key = format!("key{}", i);
Expand Down Expand Up @@ -351,8 +348,7 @@ mod test {
let read = block.get::<f32, &str>("prefix", key).unwrap();
values_before_flush.push(read);
}
let blocks = vec![block.clone()];
block_manager.flush(&blocks).await.unwrap();
block_manager.flush(&block).await.unwrap();
let block = block_manager.get(&delta.id).await.unwrap();
assert_eq!(size, block.get_size());
for i in 0..n {
Expand Down Expand Up @@ -383,11 +379,10 @@ mod test {

let size = delta.get_size::<&str, &RoaringBitmap>();
let block = block_manager.commit::<&str, &RoaringBitmap>(&delta);
let blocks = vec![block];
block_manager.flush(&blocks).await.unwrap();
block_manager.flush(&block).await.unwrap();
let block = block_manager.get(&delta.id).await.unwrap();
// TODO: enable this assertion after the sizing is fixed
// assert_eq!(size, block.get_size());
assert_eq!(size, block.get_size());

for i in 0..n {
let key = format!("{:04}", i);
Expand Down Expand Up @@ -448,8 +443,7 @@ mod test {

let size = delta.get_size::<&str, &DataRecord>();
let block = block_manager.commit::<&str, &DataRecord>(&delta);
let blocks = vec![block];
block_manager.flush(&blocks).await.unwrap();
block_manager.flush(&block).await.unwrap();
let block = block_manager.get(&delta.id).await.unwrap();
for i in 0..3 {
let read = block.get::<&str, DataRecord>("", ids[i]).unwrap();
Expand Down Expand Up @@ -483,8 +477,7 @@ mod test {

let size = delta.get_size::<u32, &str>();
let block = block_manager.commit::<u32, &str>(&delta);
let blocks = vec![block];
block_manager.flush(&blocks).await.unwrap();
block_manager.flush(&block).await.unwrap();
let block = block_manager.get(&delta.id).await.unwrap();
assert_eq!(size, block.get_size());

Expand Down
3 changes: 1 addition & 2 deletions rust/worker/src/blockstore/arrow/block/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,4 @@ mod types;
mod u32_key;
mod u32_value;
// Re-export types at the arrow_blockfile module level
// pub(in crate::blockstore::arrow) use types::*;
pub use types::*;
pub(in crate::blockstore) use types::*;
3 changes: 1 addition & 2 deletions rust/worker/src/blockstore/arrow/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use serde::Deserialize;

use crate::cache::config::CacheConfig;
use serde::Deserialize;

#[cfg(test)]
// A small block size for testing, so that triggering splits etc is easier
Expand Down
5 changes: 3 additions & 2 deletions rust/worker/src/blockstore/arrow/flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ impl ArrowBlockfileFlusher {
panic!("Invariant violation. Sparse index should be not empty during flush.");
}
// TODO: We could flush in parallel
self.block_manager.flush(&self.blocks).await?;

for block in &self.blocks {
self.block_manager.flush(block).await?;
}
self.sparse_index_manager
.flush::<K>(&self.sparse_index)
.await?;
Expand Down
4 changes: 2 additions & 2 deletions rust/worker/src/blockstore/arrow/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
pub mod block;
pub(crate) mod block;
pub(crate) mod blockfile;
mod concurrency_test;
pub(crate) mod config;
pub(crate) mod flusher;
pub(crate) mod provider;
pub mod sparse_index;
pub(crate) mod sparse_index;
pub(crate) mod types;
36 changes: 17 additions & 19 deletions rust/worker/src/blockstore/arrow/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,25 +266,23 @@ impl BlockManager {
}
}

pub(super) async fn flush(&self, block: &Vec<Block>) -> Result<(), Box<dyn ChromaError>> {
for block in block {
let bytes = match block.to_bytes() {
Ok(bytes) => bytes,
Err(e) => {
tracing::error!("Failed to convert block to bytes");
return Err(Box::new(e));
}
};
let key = format!("block/{}", block.id);
let res = self.storage.put_bytes(&key, bytes).await;
match res {
Ok(_) => {
tracing::info!("Block: {} written to storage", block.id);
}
Err(e) => {
tracing::info!("Error writing block to storage {}", e);
return Err(Box::new(e));
}
pub(super) async fn flush(&self, block: &Block) -> Result<(), Box<dyn ChromaError>> {
let bytes = match block.to_bytes() {
Ok(bytes) => bytes,
Err(e) => {
tracing::error!("Failed to convert block to bytes");
return Err(Box::new(e));
}
};
let key = format!("block/{}", block.id);
let res = self.storage.put_bytes(&key, bytes).await;
match res {
Ok(_) => {
tracing::info!("Block: {} written to storage", block.id);
}
Err(e) => {
tracing::info!("Error writing block to storage {}", e);
return Err(Box::new(e));
}
}
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion rust/worker/src/blockstore/arrow/sparse_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl Ord for SparseIndexDelimiter {
/// - `len` - Get the number of blocks in the sparse index
/// - `is_valid` - Check if the sparse index is valid, useful for debugging and testing
#[derive(Clone)]
pub struct SparseIndex {
pub(crate) struct SparseIndex {
pub(super) forward: Arc<Mutex<BTreeMap<SparseIndexDelimiter, Uuid>>>,
reverse: Arc<Mutex<HashMap<Uuid, SparseIndexDelimiter>>>,
pub(super) id: Uuid,
Expand Down
2 changes: 1 addition & 1 deletion rust/worker/src/blockstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ pub(crate) mod config;
pub mod key;
pub mod memory;
pub(crate) mod provider;
pub use types::*;
pub(crate) use types::*;
7 changes: 4 additions & 3 deletions rust/worker/src/cache/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ where
V: Send + Sync + Clone + 'static,
{
Unbounded(UnboundedCache<K, V>),
Foyer(FoyerCache<K, V>),
Foyer(FoyerCacheWrapper<K, V>),
}

impl<K: Send + Sync + Hash + Eq + 'static, V: Send + Sync + Clone + 'static> Cache<K, V> {
pub fn new(config: &CacheConfig) -> Self {
match config {
CacheConfig::Unbounded(_) => Cache::Unbounded(UnboundedCache::new(config)),
_ => Cache::Foyer(FoyerCacheWrapper::new(config).cache),
_ => Cache::Foyer(FoyerCacheWrapper::new(config)),
}
}

Expand All @@ -46,7 +46,7 @@ impl<K: Send + Sync + Hash + Eq + 'static, V: Send + Sync + Clone + 'static> Cac
let entry = cache.get(key);
match entry {
Some(v) => {
let value = v.value().to_owned();
let value = v.to_owned();
Some(value)
}
None => None,
Expand Down Expand Up @@ -93,6 +93,7 @@ where
}
}

#[derive(Clone)]
pub struct FoyerCacheWrapper<K, V>
where
K: Send + Sync + Hash + Eq + 'static,
Expand Down
3 changes: 0 additions & 3 deletions rust/worker/src/cache/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
use crate::cache::cache::Cache;
use crate::errors::ChromaError;
use core::hash::Hash;
use serde::Deserialize;

#[derive(Deserialize, Debug, Clone)]
Expand Down
10 changes: 9 additions & 1 deletion rust/worker/src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod cache;
pub mod config;
use crate::cache::cache::Cache;
use crate::cache::config::CacheConfig;
use crate::config::Configurable;
use crate::errors::ChromaError;
use std::hash::Hash;

Expand All @@ -12,5 +13,12 @@ where
K: Send + Sync + Hash + Eq + 'static,
V: Send + Sync + Clone + 'static,
{
Ok(Cache::new(config))
match config {
CacheConfig::Unbounded(_) => Ok(Cache::Unbounded(
crate::cache::cache::UnboundedCache::try_from_config(config).await?,
)),
_ => Ok(Cache::Foyer(
crate::cache::cache::FoyerCacheWrapper::try_from_config(config).await?,
)),
}
}
1 change: 0 additions & 1 deletion rust/worker/src/storage/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::cache::config::CacheConfig;
use serde::Deserialize;

#[derive(Deserialize, Debug)]
Expand Down

0 comments on commit 3bcf445

Please sign in to comment.