Skip to content

Commit

Permalink
Review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sanketkedia committed Aug 20, 2024
1 parent 1a7712c commit bc806d5
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 31 deletions.
4 changes: 2 additions & 2 deletions rust/blockstore/src/arrow/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ impl BlockManager {
let block = self.block_cache.get(id);
match block {
Some(block) => Some(block.clone()),
None => {
None => async {
let key = format!("block/{}", id);
let bytes_res = self
.storage
Expand Down Expand Up @@ -248,7 +248,7 @@ impl BlockManager {
return None;
}
}
}
}.instrument(tracing::trace_span!(parent: Span::current(), "BlockManager get cold")).await
}
}

Expand Down
66 changes: 42 additions & 24 deletions rust/index/src/hnsw_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,36 @@ impl HnswIndexProvider {
}
}

async fn copy_bytes_to_local_file(
&self,
file_path: &PathBuf,
buf: Arc<Vec<u8>>,
) -> Result<(), Box<HnswIndexProviderFileError>> {
let file_handle = tokio::fs::File::create(&file_path).await;
let mut file_handle = match file_handle {
Ok(file) => file,
Err(e) => {
tracing::error!("Failed to create file: {}", e);
return Err(Box::new(HnswIndexProviderFileError::IOError(e)));
}
};
let res = file_handle.write_all(&buf).await;
match res {
Ok(_) => {}
Err(e) => {
tracing::error!("Failed to copy file: {}", e);
return Err(Box::new(HnswIndexProviderFileError::IOError(e)));
}
}
match file_handle.flush().await {
Ok(_) => Ok(()),
Err(e) => {
tracing::error!("Failed to flush file: {}", e);
Err(Box::new(HnswIndexProviderFileError::IOError(e)))
}
}
}

#[instrument]
async fn load_hnsw_segment_into_directory(
&self,
Expand All @@ -163,38 +193,26 @@ impl HnswIndexProvider {
let key = self.format_key(source_id, file);
tracing::info!("Loading hnsw index file: {}", key);
let bytes_res = self.storage.get(&key).await;
let bytes_read;
let buf = match bytes_res {
Ok(buf) => buf,
Ok(buf) => {
bytes_read = buf.len();
buf
}
Err(e) => {
tracing::error!("Failed to load hnsw index file from storage: {}", e);
return Err(Box::new(HnswIndexProviderFileError::StorageGetError(e)));
}
};

let file_path = index_storage_path.join(file);
// For now, we never evict from the cache, so if the index is being loaded, the file does not exist
let file_handle = tokio::fs::File::create(&file_path).await;
let mut file_handle = match file_handle {
Ok(file) => file,
Err(e) => {
tracing::error!("Failed to create file: {}", e);
return Err(Box::new(HnswIndexProviderFileError::IOError(e)));
}
};
let res = file_handle.write_all(&buf).await;
match res {
Ok(_) => {}
Err(e) => {
tracing::error!("Failed to copy file: {}", e);
return Err(Box::new(HnswIndexProviderFileError::IOError(e)));
}
}
match file_handle.flush().await {
Ok(_) => {}
Err(e) => {
return Err(Box::new(HnswIndexProviderFileError::IOError(e)));
}
}
self.copy_bytes_to_local_file(&file_path, buf).instrument(tracing::info_span!(parent: Span::current(), "hnsw provider copy bytes to local file", file = file)).await?;
tracing::info!(
"Copied {} bytes from storage key: {} to file: {}",
bytes_read,
key,
file_path.to_str().unwrap()
);
tracing::info!("Loaded hnsw index file: {}", file);
}
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions rust/storage/src/admissioncontrolleds3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,14 @@ impl AdmissionControlledS3Storage {
) -> Result<Arc<Vec<u8>>, AdmissionControlledS3StorageError> {
let bytes_res = storage
.get(&key)
.instrument(tracing::trace_span!(parent: Span::current(), "Storage get"))
.instrument(tracing::trace_span!(parent: Span::current(), "S3 get"))
.await;
match bytes_res {
Ok(bytes) => {
return Ok(bytes);
}
Err(e) => {
tracing::error!("Error reading from storage: {}", e);
tracing::error!("Error reading from s3: {}", e);
return Err(AdmissionControlledS3StorageError::S3GetError(e));
}
}
Expand Down
6 changes: 3 additions & 3 deletions rust/storage/src/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,9 @@ impl S3Storage {
pub async fn get(&self, key: &str) -> Result<Arc<Vec<u8>>, S3GetError> {
let mut stream = self
.get_stream(key)
.instrument(tracing::trace_span!(parent: Span::current(), "Storage get"))
.instrument(tracing::trace_span!(parent: Span::current(), "S3 get stream"))
.await?;
let read_block_span = tracing::trace_span!(parent: Span::current(), "Read bytes to end");
let read_block_span = tracing::trace_span!(parent: Span::current(), "S3 read bytes to end");
let buf = read_block_span
.in_scope(|| async {
let mut buf: Vec<u8> = Vec::new();
Expand All @@ -184,7 +184,7 @@ impl S3Storage {
buf.extend(chunk);
}
Err(err) => {
tracing::error!("Error reading from storage: {}", err);
tracing::error!("Error reading from S3: {}", err);
match err {
GetError::S3Error(e) => {
return Err(e);
Expand Down

0 comments on commit bc806d5

Please sign in to comment.