diff --git a/rust/worker/src/blockstore/arrow/flusher.rs b/rust/worker/src/blockstore/arrow/flusher.rs index a8aaa9c7d4f..b559aba61a7 100644 --- a/rust/worker/src/blockstore/arrow/flusher.rs +++ b/rust/worker/src/blockstore/arrow/flusher.rs @@ -1,11 +1,10 @@ -use crate::errors::ChromaError; - use super::{ provider::{BlockManager, SparseIndexManager}, sparse_index::SparseIndex, types::{ArrowWriteableKey, ArrowWriteableValue}, }; -use std::collections::{HashMap, HashSet}; +use crate::errors::ChromaError; +use std::collections::HashSet; use uuid::Uuid; pub(crate) struct ArrowBlockfileFlusher { @@ -37,14 +36,13 @@ impl ArrowBlockfileFlusher { pub(crate) async fn flush( self, ) -> Result<(), Box> { + // TODO: We could flush in parallel for delta_id in self.modified_delta_ids { - self.block_manager.flush(&delta_id).await; + self.block_manager.flush(&delta_id).await? } - // TODO: catch errors from the flush - let res = self - .sparse_index_manager + self.sparse_index_manager .flush::(&self.sparse_index.id) - .await; + .await?; Ok(()) } diff --git a/rust/worker/src/blockstore/arrow/provider.rs b/rust/worker/src/blockstore/arrow/provider.rs index 046d79aab54..ccba2233336 100644 --- a/rust/worker/src/blockstore/arrow/provider.rs +++ b/rust/worker/src/blockstore/arrow/provider.rs @@ -1,23 +1,24 @@ use super::{ - block::{self, delta::BlockDelta, Block}, - blockfile::{self, ArrowBlockfileReader, ArrowBlockfileWriter}, - sparse_index::{self, SparseIndex}, + block::{delta::BlockDelta, Block}, + blockfile::{ArrowBlockfileReader, ArrowBlockfileWriter}, + sparse_index::SparseIndex, types::{ArrowReadableKey, ArrowReadableValue, ArrowWriteableKey, ArrowWriteableValue}, }; use crate::{ blockstore::{ key::KeyWrapper, memory::storage::Readable, - provider::{BlockfileProvider, CreateError, OpenError}, + provider::{CreateError, OpenError}, BlockfileReader, BlockfileWriter, Key, Value, }, - errors::ChromaError, + errors::{ChromaError, ErrorCodes}, storage::Storage, }; use core::panic; -use parking_lot::{Mutex, RwLock}; +use parking_lot::RwLock; use std::{collections::HashMap, sync::Arc}; -use tokio::{io::AsyncReadExt, pin}; +use thiserror::Error; +use tokio::io::AsyncReadExt; use uuid::Uuid; /// A BlockFileProvider that creates ArrowBlockfiles (Arrow-backed blockfiles used for production). @@ -192,35 +193,9 @@ impl BlockManager { } } } - - // match cache.get(id) { - // Some(block) => Some(block.clone()), - // None => { - // let key = format!("block/{}", id); - // let bytes = self.storage.get(&key).await; - // match bytes { - // Ok(mut bytes) => { - // let mut buf: Vec = Vec::new(); - // bytes.read_to_end(&mut buf); - // let block = Block::from_bytes(&buf); - // match block { - // Ok(block) => { - // self.read_cache.write().insert(*id, block.clone()); - // Some(block) - // } - // Err(_) => { - // // TODO: log error - // None - // } - // } - // } - // Err(_) => None, - // } - // } - // } } - pub(super) async fn flush(&self, id: &Uuid) { + pub(super) async fn flush(&self, id: &Uuid) -> Result<(), Box> { let block = self.get(id).await; match block { @@ -230,15 +205,32 @@ impl BlockManager { let res = self.storage.put_bytes(&key, bytes).await; match res { Ok(_) => { - println!("Block written to storage") + println!("Block: {} written to storage", id); + Ok(()) } Err(e) => { println!("Error writing block to storage {}", e); + Err(Box::new(e)) } } - // TODO: error handling } - None => {} + None => { + return Err(Box::new(BlockFlushError::NotFound)); + } + } + } +} + +#[derive(Error, Debug)] +pub enum BlockFlushError { + #[error("Not found")] + NotFound, +} + +impl ChromaError for BlockFlushError { + fn code(&self) -> ErrorCodes { + match self { + BlockFlushError::NotFound => ErrorCodes::NotFound, } } } @@ -332,7 +324,10 @@ impl SparseIndexManager { self.cache.write().insert(index.id, index); } - pub async fn flush<'read, K: ArrowWriteableKey + 'read>(&self, id: &Uuid) { + pub async fn flush<'read, K: ArrowWriteableKey + 'read>( + &self, + id: &Uuid, + ) -> Result<(), Box> { let index = self.get::>(id).await; match index { Some(index) => { @@ -344,22 +339,24 @@ impl SparseIndexManager { let res = self.storage.put_bytes(&key, bytes).await; match res { Ok(_) => { - println!("Sparse index written to storage") + println!("Sparse index written to storage"); + Ok(()) } - Err(_) => { + Err(e) => { println!("Error writing sparse index to storage"); + Err(Box::new(e)) } } } - Err(_) => { - // TODO: error - panic!("Failed to convert sparse index to block"); + Err(e) => { + println!("Failed to convert sparse index to block"); + Err(e) } } } None => { - // TODO: error - panic!("Tried to flush a sparse index that doesn't exist"); + println!("Tried to flush a sparse index that doesn't exist"); + return Err(Box::new(SparseIndexFlushError::NotFound)); } } } @@ -377,3 +374,17 @@ impl SparseIndexManager { forked } } + +#[derive(Error, Debug)] +pub enum SparseIndexFlushError { + #[error("Not found")] + NotFound, +} + +impl ChromaError for SparseIndexFlushError { + fn code(&self) -> ErrorCodes { + match self { + SparseIndexFlushError::NotFound => ErrorCodes::NotFound, + } + } +} diff --git a/rust/worker/src/index/hnsw_provider.rs b/rust/worker/src/index/hnsw_provider.rs index 68144efc248..35706b2fcb8 100644 --- a/rust/worker/src/index/hnsw_provider.rs +++ b/rust/worker/src/index/hnsw_provider.rs @@ -187,8 +187,8 @@ impl HnswIndexProvider { println!("Flushed hnsw index file: {}", file); } Err(e) => { - // TODO: return err - panic!("Failed to flush index: {}", e); + println!("Failed to flush index: {}", e); + return Err(Box::new(e)); } } } diff --git a/rust/worker/src/storage/mod.rs b/rust/worker/src/storage/mod.rs index 25577a6652d..94b618f9032 100644 --- a/rust/worker/src/storage/mod.rs +++ b/rust/worker/src/storage/mod.rs @@ -1,12 +1,12 @@ use self::config::StorageConfig; +use self::s3::S3GetError; use crate::config::Configurable; -use crate::errors::ChromaError; -use async_trait::async_trait; -use bytes::Bytes; +use crate::errors::{ChromaError, ErrorCodes}; use tokio::io::AsyncBufRead; pub(crate) mod config; pub(crate) mod local; pub(crate) mod s3; +use thiserror::Error; #[derive(Clone)] pub(crate) enum Storage { @@ -14,28 +14,93 @@ pub(crate) enum Storage { Local(local::LocalStorage), } +#[derive(Error, Debug)] +pub enum GetError { + #[error("No such key: {0}")] + NoSuchKey(String), + #[error("S3 error: {0}")] + S3Error(#[from] S3GetError), + #[error("Local storage error: {0}")] + LocalError(String), +} + +impl ChromaError for GetError { + fn code(&self) -> ErrorCodes { + match self { + GetError::NoSuchKey(_) => ErrorCodes::NotFound, + GetError::S3Error(_) => ErrorCodes::Internal, + GetError::LocalError(_) => ErrorCodes::Internal, + } + } +} + +#[derive(Error, Debug)] +pub enum PutError { + #[error("S3 error: {0}")] + S3Error(#[from] s3::S3PutError), + #[error("Local storage error: {0}")] + LocalError(String), +} + +impl ChromaError for PutError { + fn code(&self) -> ErrorCodes { + match self { + PutError::S3Error(_) => ErrorCodes::Internal, + PutError::LocalError(_) => ErrorCodes::Internal, + } + } +} + impl Storage { pub(crate) async fn get( &self, key: &str, - ) -> Result, String> { + ) -> Result, GetError> { match self { - Storage::S3(s3) => s3.get(key).await, - Storage::Local(local) => local.get(key).await, + Storage::S3(s3) => { + let res = s3.get(key).await; + match res { + Ok(res) => Ok(res), + Err(e) => match e { + S3GetError::NoSuchKey(_) => Err(GetError::NoSuchKey(key.to_string())), + _ => Err(GetError::S3Error(e)), + }, + } + } + Storage::Local(local) => { + let res = local.get(key).await; + match res { + Ok(res) => Ok(res), + // TODO: Special case no such key if possible + Err(e) => Err(GetError::LocalError(e)), + } + } } } - pub(crate) async fn put_file(&self, key: &str, path: &str) -> Result<(), String> { + pub(crate) async fn put_file(&self, key: &str, path: &str) -> Result<(), PutError> { match self { - Storage::S3(s3) => s3.put_file(key, path).await, - Storage::Local(local) => local.put_file(key, path).await, + Storage::S3(s3) => s3 + .put_file(key, path) + .await + .map_err(|e| PutError::S3Error(e)), + Storage::Local(local) => local + .put_file(key, path) + .await + .map_err(|e| PutError::LocalError(e)), } } - pub(crate) async fn put_bytes(&self, key: &str, bytes: Vec) -> Result<(), String> { + pub(crate) async fn put_bytes(&self, key: &str, bytes: Vec) -> Result<(), PutError> { match self { - Storage::S3(s3) => s3.put_bytes(key, bytes).await, - Storage::Local(local) => local.put_bytes(key, &bytes).await, + Storage::S3(s3) => s3 + .put_bytes(key, bytes) + .await + .map_err(|e| PutError::S3Error(e)), + Storage::Local(local) => local + .put_bytes(key, &bytes) + .await + .map_err(|e| PutError::LocalError(e)), } } } diff --git a/rust/worker/src/storage/s3.rs b/rust/worker/src/storage/s3.rs index 5ab330c2989..ceb03483874 100644 --- a/rust/worker/src/storage/s3.rs +++ b/rust/worker/src/storage/s3.rs @@ -8,17 +8,16 @@ // Once we move to our own implementation of hnswlib we can support // streaming from s3. -use super::{config::StorageConfig, Storage}; +use super::config::StorageConfig; use crate::config::Configurable; use crate::errors::ChromaError; use async_trait::async_trait; use aws_sdk_s3; +use aws_sdk_s3::error::ProvideErrorMetadata; use aws_sdk_s3::error::SdkError; use aws_sdk_s3::operation::create_bucket::CreateBucketError; use aws_smithy_types::byte_stream::ByteStream; -use bytes::Bytes; use std::clone::Clone; -use std::io::Write; use thiserror::Error; use tokio::io::AsyncBufRead; @@ -28,6 +27,32 @@ pub(crate) struct S3Storage { client: aws_sdk_s3::Client, } +#[derive(Error, Debug)] +pub enum S3PutError { + #[error("S3 PUT error: {0}")] + S3PutError(String), +} + +impl ChromaError for S3PutError { + fn code(&self) -> crate::errors::ErrorCodes { + crate::errors::ErrorCodes::Internal + } +} + +#[derive(Error, Debug)] +pub enum S3GetError { + #[error("S3 GET error: {0}")] + S3GetError(String), + #[error("No such key: {0}")] + NoSuchKey(String), +} + +impl ChromaError for S3GetError { + fn code(&self) -> crate::errors::ErrorCodes { + crate::errors::ErrorCodes::Internal + } +} + impl S3Storage { fn new(bucket: &str, client: aws_sdk_s3::Client) -> S3Storage { return S3Storage { @@ -77,7 +102,7 @@ impl S3Storage { pub(crate) async fn get( &self, key: &str, - ) -> Result, String> { + ) -> Result, S3GetError> { let res = self .client .get_object() @@ -97,47 +122,45 @@ impl S3Storage { match inner { aws_sdk_s3::operation::get_object::GetObjectError::NoSuchKey(msg) => { println!("no such key: {}", msg); - return Err::<_, String>(msg.to_string()); + return Err(S3GetError::NoSuchKey(msg.to_string())); } aws_sdk_s3::operation::get_object::GetObjectError::InvalidObjectState(msg) => { print!("invalid object state: {}", msg); - return Err::<_, String>(msg.to_string()); + return Err(S3GetError::S3GetError(msg.to_string())); } aws_sdk_s3::operation::get_object::GetObjectError::Unhandled(_) => { println!("unhandled error"); - return Err::<_, String>("unhandled error".to_string()); + return Err(S3GetError::S3GetError("unhandled error".to_string())); } _ => { println!("error: {}", inner.to_string()); - return Err::<_, String>(inner.to_string()); + return Err(S3GetError::S3GetError(inner.to_string())); } }; } _ => {} } - return Err::<_, String>(e.to_string()); + return Err(S3GetError::S3GetError(e.to_string())); } } } - pub(crate) async fn put_bytes(&self, key: &str, bytes: Vec) -> Result<(), String> { + pub(crate) async fn put_bytes(&self, key: &str, bytes: Vec) -> Result<(), S3PutError> { let bytestream = ByteStream::from(bytes); self.put_bytestream(key, bytestream).await } - pub(crate) async fn put_file(&self, key: &str, path: &str) -> Result<(), String> { + pub(crate) async fn put_file(&self, key: &str, path: &str) -> Result<(), S3PutError> { let bytestream = ByteStream::from_path(path).await; match bytestream { - Ok(bytestream) => { - return self.put_bytestream(key, bytestream).await; - } + Ok(bytestream) => return self.put_bytestream(key, bytestream).await, Err(e) => { - return Err::<(), String>(e.to_string()); + return Err(S3PutError::S3PutError(e.to_string())); } } } - async fn put_bytestream(&self, key: &str, bytestream: ByteStream) -> Result<(), String> { + async fn put_bytestream(&self, key: &str, bytestream: ByteStream) -> Result<(), S3PutError> { let res = self .client .put_object() @@ -151,10 +174,22 @@ impl S3Storage { println!("put object {} to bucket {}", key, self.bucket); return Ok(()); } - Err(e) => { - println!("s3 error: {}", e); - return Err::<(), String>(e.to_string()); - } + Err(e) => match e { + SdkError::ServiceError(err) => { + let inner_err = err.into_err(); + let err_string = format!( + "S3 service error with code: {:?} and message: {:?}", + inner_err.code(), + inner_err.message() + ); + println!("{}", err_string); + return Err(S3PutError::S3PutError(err_string)); + } + _ => { + println!("S3 Put Error: {}", e); + return Err(S3PutError::S3PutError(e.to_string())); + } + }, } } }