Skip to content

Commit

Permalink
[ENH] Block manager and HNSW provider consume Network Admission Contr…
Browse files Browse the repository at this point in the history
…ol (#2631)

## Description of changes

*Summarize the changes made by this PR.*
- Blockfile Manager and HNSW Index Provider uses the NAC when doing get() calls.
- NAC returns a future that they can await on. Once they have the bytes, they can perform specific operations such as deserializing to block, writing to local disk, etc. These operations need to be made thread safe.

## Test plan
- [x] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust

## Documentation Changes
None
  • Loading branch information
sanketkedia authored Aug 20, 2024
1 parent d48f4fd commit 2b098f8
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 115 deletions.
88 changes: 32 additions & 56 deletions rust/blockstore/src/arrow/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,68 +212,43 @@ impl BlockManager {
let block = self.block_cache.get(id);
match block {
Some(block) => Some(block.clone()),
None => {
// TODO: NAC register/deregister/validation goes here.
async {
let key = format!("block/{}", id);
let stream = self.storage.get_stream(&key).instrument(
None => async {
let key = format!("block/{}", id);
let bytes_res = self
.storage
.get(&key)
.instrument(
tracing::trace_span!(parent: Span::current(), "BlockManager storage get"),
).await;
match stream {
Ok(mut bytes) => {
let read_block_span = tracing::trace_span!(parent: Span::current(), "BlockManager read bytes to end");
let buf = read_block_span.in_scope(|| async {
let mut buf: Vec<u8> = Vec::new();
while let Some(res) = bytes.next().await {
match res {
Ok(chunk) => {
buf.extend(chunk);
}
Err(e) => {
tracing::error!("Error reading block from storage: {}", e);
return None;
}
}
}
Some(buf)
)
.await;
match bytes_res {
Ok(bytes) => {
let deserialization_span = tracing::trace_span!(parent: Span::current(), "BlockManager deserialize block");
let block =
deserialization_span.in_scope(|| Block::from_bytes(&bytes, *id));
match block {
Ok(block) => {
self.block_cache.insert(*id, block.clone());
Some(block)
}
).await;
let buf = match buf {
Some(buf) => {
buf
}
None => {
return None;
}
};
tracing::info!("Read {:?} bytes from s3", buf.len());
let deserialization_span = tracing::trace_span!(parent: Span::current(), "BlockManager deserialize block");
let block = deserialization_span.in_scope(|| Block::from_bytes(&buf, *id));
match block {
Ok(block) => {
self.block_cache.insert(*id, block.clone());
Some(block)
}
Err(e) => {
// TODO: Return an error to callsite instead of None.
tracing::error!(
"Error converting bytes to Block {:?}/{:?}",
key,
e
);
None
}
Err(e) => {
// TODO: Return an error to callsite instead of None.
tracing::error!(
"Error converting bytes to Block {:?}/{:?}",
key,
e
);
None
}
},
Err(e) => {
tracing::error!("Error reading block from storage: {}", e);
None
}
}
Err(e) => {
tracing::error!("Error converting bytes to Block {:?}", e);
// TODO: Return error instead of None.
return None;
}
}
.instrument(tracing::trace_span!(parent: Span::current(), "BlockManager get cold"))
.await
}
}.instrument(tracing::trace_span!(parent: Span::current(), "BlockManager get cold")).await
}
}

Expand Down Expand Up @@ -341,6 +316,7 @@ impl SparseIndexManager {
tracing::info!("Cache miss - fetching sparse index from storage");
let key = format!("sparse_index/{}", id);
tracing::debug!("Reading sparse index from storage with key: {}", key);
// TODO: This should pass through NAC as well.
let stream = self.storage.get_stream(&key).await;
let mut buf: Vec<u8> = Vec::new();
match stream {
Expand Down
95 changes: 41 additions & 54 deletions rust/index/src/hnsw_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,86 +151,73 @@ impl HnswIndexProvider {
}
}

async fn copy_bytes_to_local_file(
&self,
file_path: &PathBuf,
buf: Arc<Vec<u8>>,
) -> Result<(), Box<HnswIndexProviderFileError>> {
let file_handle = tokio::fs::File::create(&file_path).await;
let mut file_handle = match file_handle {
Ok(file) => file,
Err(e) => {
tracing::error!("Failed to create file: {}", e);
return Err(Box::new(HnswIndexProviderFileError::IOError(e)));
}
};
let res = file_handle.write_all(&buf).await;
match res {
Ok(_) => {}
Err(e) => {
tracing::error!("Failed to copy file: {}", e);
return Err(Box::new(HnswIndexProviderFileError::IOError(e)));
}
}
match file_handle.flush().await {
Ok(_) => Ok(()),
Err(e) => {
tracing::error!("Failed to flush file: {}", e);
Err(Box::new(HnswIndexProviderFileError::IOError(e)))
}
}
}

#[instrument]
async fn load_hnsw_segment_into_directory(
&self,
source_id: &Uuid,
index_storage_path: &Path,
) -> Result<(), Box<HnswIndexProviderFileError>> {
// Fetch the files from storage and put them in the index storage path
// Fetch the files from storage and put them in the index storage path.
// TODO: Fetch multiple chunks in parallel from S3.
for file in FILES.iter() {
let key = self.format_key(source_id, file);
tracing::info!("Loading hnsw index file: {}", key);
let stream = self.storage.get_stream(&key).await;
let reader = match stream {
Ok(reader) => reader,
let bytes_res = self.storage.get(&key).await;
let bytes_read;
let buf = match bytes_res {
Ok(buf) => {
bytes_read = buf.len();
buf
}
Err(e) => {
tracing::error!("Failed to load hnsw index file from storage: {}", e);
return Err(Box::new(HnswIndexProviderFileError::StorageGetError(e)));
}
};

let file_path = index_storage_path.join(file);
// For now, we never evict from the cache, so if the index is being loaded, the file does not exist
let file_handle = tokio::fs::File::create(&file_path).await;
let file_handle = match file_handle {
Ok(file) => file,
Err(e) => {
tracing::error!("Failed to create file: {}", e);
return Err(Box::new(HnswIndexProviderFileError::IOError(e)));
}
};
let total_bytes_written = self
.copy_stream_to_local_file(reader, file_handle)
.instrument(tracing::info_span!(parent: Span::current(), "hnsw provider file read", file = file))
.await?;
self.copy_bytes_to_local_file(&file_path, buf).instrument(tracing::info_span!(parent: Span::current(), "hnsw provider copy bytes to local file", file = file)).await?;
tracing::info!(
"Copied {} bytes from storage key: {} to file: {}",
total_bytes_written,
bytes_read,
key,
file_path.to_str().unwrap()
);
// bytes is an AsyncBufRead, so we fil and consume it to a file
tracing::info!("Loaded hnsw index file: {}", file);
}
Ok(())
}

async fn copy_stream_to_local_file(
&self,
stream: Box<dyn stream::Stream<Item = ByteStreamItem> + Unpin + Send>,
file_handle: tokio::fs::File,
) -> Result<u64, Box<HnswIndexProviderFileError>> {
let mut total_bytes_written = 0;
let mut file_handle = file_handle;
let mut stream = stream;
while let Some(res) = stream.next().await {
let chunk = match res {
Ok(chunk) => chunk,
Err(e) => {
return Err(Box::new(HnswIndexProviderFileError::StorageGetError(e)));
}
};

let res = file_handle.write_all(&chunk).await;
match res {
Ok(_) => {
total_bytes_written += chunk.len() as u64;
}
Err(e) => {
tracing::error!("Failed to copy file: {}", e);
return Err(Box::new(HnswIndexProviderFileError::IOError(e)));
}
}
}
match file_handle.flush().await {
Ok(_) => Ok(total_bytes_written),
Err(e) => {
return Err(Box::new(HnswIndexProviderFileError::IOError(e)));
}
}
}

pub async fn open(
&self,
id: &Uuid,
Expand Down
4 changes: 2 additions & 2 deletions rust/storage/src/admissioncontrolleds3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,14 @@ impl AdmissionControlledS3Storage {
) -> Result<Arc<Vec<u8>>, AdmissionControlledS3StorageError> {
let bytes_res = storage
.get(&key)
.instrument(tracing::trace_span!(parent: Span::current(), "Storage get"))
.instrument(tracing::trace_span!(parent: Span::current(), "S3 get"))
.await;
match bytes_res {
Ok(bytes) => {
return Ok(bytes);
}
Err(e) => {
tracing::error!("Error reading from storage: {}", e);
tracing::error!("Error reading from s3: {}", e);
return Err(AdmissionControlledS3StorageError::S3GetError(e));
}
}
Expand Down
6 changes: 3 additions & 3 deletions rust/storage/src/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,9 @@ impl S3Storage {
pub async fn get(&self, key: &str) -> Result<Arc<Vec<u8>>, S3GetError> {
let mut stream = self
.get_stream(key)
.instrument(tracing::trace_span!(parent: Span::current(), "Storage get"))
.instrument(tracing::trace_span!(parent: Span::current(), "S3 get stream"))
.await?;
let read_block_span = tracing::trace_span!(parent: Span::current(), "Read bytes to end");
let read_block_span = tracing::trace_span!(parent: Span::current(), "S3 read bytes to end");
let buf = read_block_span
.in_scope(|| async {
let mut buf: Vec<u8> = Vec::new();
Expand All @@ -184,7 +184,7 @@ impl S3Storage {
buf.extend(chunk);
}
Err(err) => {
tracing::error!("Error reading from storage: {}", err);
tracing::error!("Error reading from S3: {}", err);
match err {
GetError::S3Error(e) => {
return Err(e);
Expand Down

0 comments on commit 2b098f8

Please sign in to comment.