Skip to content

Commit

Permalink
Not invoke callback
Browse files Browse the repository at this point in the history
  • Loading branch information
sanketkedia committed Aug 8, 2024
1 parent 53382ec commit 6207879
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 89 deletions.
91 changes: 26 additions & 65 deletions rust/blockstore/src/arrow/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,80 +213,41 @@ impl BlockManager {
match block {
Some(block) => Some(block.clone()),
None => {
// TODO: NAC register/deregister/validation goes here.
async {
let key = format!("block/{}", id);
let stream = self.storage.get_internal(&key).instrument(
let key = format!("block/{}", id);
let bytes_res = self
.storage
.get(&key)
.instrument(
tracing::trace_span!(parent: Span::current(), "BlockManager storage get"),
).await;
match stream {
Ok(mut bytes) => {
let read_block_span = tracing::trace_span!(parent: Span::current(), "BlockManager read bytes to end");
let buf = read_block_span.in_scope(|| async {
let mut buf: Vec<u8> = Vec::new();
while let Some(res) = bytes.next().await {
match res {
Ok(chunk) => {
buf.extend(chunk);
}
Err(e) => {
tracing::error!("Error reading block from storage: {}", e);
return None;
}
}
}
Some(buf)
)
.await;
match bytes_res {
Ok(bytes) => {
let deserialization_span = tracing::trace_span!(parent: Span::current(), "BlockManager deserialize block");
let block =
deserialization_span.in_scope(|| Block::from_bytes(&bytes, *id));
match block {
Ok(block) => {
self.block_cache.insert(*id, block.clone());
Some(block)
}
).await;
let buf = match buf {
Some(buf) => {
buf
}
None => {
return None;
}
};
tracing::info!("Read {:?} bytes from s3", buf.len());
let deserialization_span = tracing::trace_span!(parent: Span::current(), "BlockManager deserialize block");
let block = deserialization_span.in_scope(|| Block::from_bytes(&buf, *id));
match block {
Ok(block) => {
self.block_cache.insert(*id, block.clone());
Some(block)
}
Err(e) => {
// TODO: Return an error to callsite instead of None.
tracing::error!(
"Error converting bytes to Block {:?}/{:?}",
key,
e
);
None
}
Err(e) => {
// TODO: Return an error to callsite instead of None.
tracing::error!(
"Error converting bytes to Block {:?}/{:?}",
key,
e
);
None
}
},
Err(e) => {
tracing::error!("Error converting bytes to Block {:?}", e);
return Err(Box::new(
NetworkAdmissionControlError::DeserializationError,
));
}
}
Ok(())
};
match self.network_admission_control.get(key, cb).await {
Ok(()) => {}
Err(e) => {
// TODO: Return error here.
tracing::error!(
"Error getting block from the network admission control {}",
e
);
tracing::error!("Error converting bytes to Block {:?}", e);
// TODO: Return error instead of None.
return None;
}
}
// Cache must be populated now.
self.block_cache.get(id)
}
}
}
Expand Down
40 changes: 16 additions & 24 deletions rust/index/src/hnsw_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,9 @@ impl HnswIndexProvider {
for file in FILES.iter() {
let key = self.format_key(source_id, file);
tracing::info!("Loading hnsw index file: {}", key);
let stream = self.storage.get_internal(&key).await;
let reader = match stream {
Ok(reader) => reader,
let bytes_res = self.storage.get(&key).await;
let buf = match bytes_res {
Ok(buf) => buf,
Err(e) => {
tracing::error!("Failed to load hnsw index file from storage: {}", e);
return Err(Box::new(HnswIndexProviderFileError::StorageGetError(e)));
Expand All @@ -181,28 +181,18 @@ impl HnswIndexProvider {
return Err(Box::new(HnswIndexProviderFileError::IOError(e)));
}
};
let cb = move |buf: Vec<u8>| async move {
let res = file_handle.write_all(&buf).await;
match res {
Ok(_) => {}
Err(e) => {
tracing::error!("Failed to copy file: {}", e);
return Err(Box::new(NetworkAdmissionControlError::IOError));
}
}
match file_handle.flush().await {
Ok(_) => {}
Err(e) => {
tracing::error!("Failed to flush file: {}", e);
return Err(Box::new(NetworkAdmissionControlError::IOError));
}
let res = file_handle.write_all(&buf).await;
match res {
Ok(_) => {}
Err(e) => {
tracing::error!("Failed to copy file: {}", e);
return Err(Box::new(HnswIndexProviderFileError::IOError(e)));
}
Ok(())
};
match self.network_admission_control.get(key, cb).await {
}
match file_handle.flush().await {
Ok(_) => {}
Err(e) => {
return Err(Box::new(HnswIndexProviderFileError::NACError(*e)));
return Err(Box::new(HnswIndexProviderFileError::IOError(e)));
}
}
tracing::info!("Loaded hnsw index file: {}", file);
Expand Down Expand Up @@ -466,8 +456,10 @@ impl ChromaError for HnswIndexProviderFlushError {
pub enum HnswIndexProviderFileError {
#[error("IO Error")]
IOError(#[from] std::io::Error),
#[error("NAC Error")]
NACError(#[from] NetworkAdmissionControlError),
#[error("Storage Get Error")]
StorageGetError(#[from] chroma_storage::GetError),
#[error("Storage Put Error")]
StoragePutError(#[from] chroma_storage::PutError),
}

#[cfg(test)]
Expand Down

0 comments on commit 6207879

Please sign in to comment.