Skip to content

Commit

Permalink
Point services to admission controlled storage
Browse files Browse the repository at this point in the history
  • Loading branch information
sanketkedia committed Aug 9, 2024
1 parent 4da3846 commit 5737fed
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 64 deletions.
4 changes: 2 additions & 2 deletions rust/blockstore/src/arrow/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ impl BlockManager {
// TODO: NAC register/deregister/validation goes here.
async {
let key = format!("block/{}", id);
let stream = self.storage.get_internal(&key).instrument(
let stream = self.storage.get_stream(&key).instrument(
tracing::trace_span!(parent: Span::current(), "BlockManager storage get"),
).await;
match stream {
Expand Down Expand Up @@ -341,7 +341,7 @@ impl SparseIndexManager {
tracing::info!("Cache miss - fetching sparse index from storage");
let key = format!("sparse_index/{}", id);
tracing::debug!("Reading sparse index from storage with key: {}", key);
let stream = self.storage.get_internal(&key).await;
let stream = self.storage.get_stream(&key).await;
let mut buf: Vec<u8> = Vec::new();
match stream {
Ok(mut bytes) => {
Expand Down
2 changes: 1 addition & 1 deletion rust/index/src/hnsw_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl HnswIndexProvider {
for file in FILES.iter() {
let key = self.format_key(source_id, file);
tracing::info!("Loading hnsw index file: {}", key);
let stream = self.storage.get_internal(&key).await;
let stream = self.storage.get_stream(&key).await;
let reader = match stream {
Ok(reader) => reader,
Err(e) => {
Expand Down
50 changes: 48 additions & 2 deletions rust/storage/src/admissioncontrolleds3.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
use super::GetError;
use crate::s3::{S3GetError, S3PutError, S3Storage};
use crate::{
config::StorageConfig,
s3::{S3GetError, S3PutError, S3Storage, StorageConfigError},
stream::ByteStreamItem,
};
use async_trait::async_trait;
use chroma_config::Configurable;
use chroma_error::{ChromaError, ErrorCodes};
use futures::{future::Shared, FutureExt, StreamExt};
use futures::{future::Shared, FutureExt, Stream, StreamExt};
use parking_lot::Mutex;
use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc};
use thiserror::Error;
Expand Down Expand Up @@ -56,6 +62,28 @@ impl AdmissionControlledS3Storage {
}
}

// TODO: Remove this once the upstream consumers switch to non-streaming APIs.
pub async fn get_stream(
&self,
key: &str,
) -> Result<
Box<dyn Stream<Item = ByteStreamItem> + Unpin + Send>,
AdmissionControlledS3StorageError,
> {
match self
.storage
.get_stream(key)
.instrument(tracing::trace_span!(parent: Span::current(), "Storage get"))
.await
{
Ok(res) => Ok(res),
Err(e) => {
tracing::error!("Error reading from storage: {}", e);
return Err(AdmissionControlledS3StorageError::S3GetError(e));
}
}
}

async fn read_from_storage(
storage: S3Storage,
key: String,
Expand Down Expand Up @@ -153,3 +181,21 @@ impl AdmissionControlledS3Storage {
self.storage.put_bytes(key, bytes).await
}
}

#[async_trait]
impl Configurable<StorageConfig> for AdmissionControlledS3Storage {
async fn try_from_config(config: &StorageConfig) -> Result<Self, Box<dyn ChromaError>> {
match &config {
StorageConfig::AdmissionControlledS3(nacconfig) => {
let s3_storage = S3Storage::try_from_config(&StorageConfig::S3(
nacconfig.s3_config.clone(),
))
.await?;
return Ok(Self::new(s3_storage));
}
_ => {
return Err(Box::new(StorageConfigError::InvalidStorageConfig));
}
}
}
}
13 changes: 10 additions & 3 deletions rust/storage/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@ pub enum StorageConfig {
S3(S3StorageConfig),
#[serde(alias = "local")]
Local(LocalStorageConfig),
#[serde(alias = "admissioncontrolleds3")]
AdmissionControlledS3(AdmissionControlledS3StorageConfig),
}

#[derive(Deserialize, PartialEq, Debug)]
#[derive(Deserialize, PartialEq, Debug, Clone)]
pub enum S3CredentialsConfig {
Minio,
AWS,
}

#[derive(Deserialize, Debug)]
#[derive(Deserialize, Debug, Clone)]
/// The configuration for the s3 storage type
/// # Fields
/// - bucket: The name of the bucket to use.
Expand All @@ -33,7 +35,7 @@ pub struct S3StorageConfig {
pub upload_part_size_bytes: usize,
}

#[derive(Deserialize, Debug)]
#[derive(Deserialize, Debug, Clone)]
/// The configuration for the local storage type
/// # Fields
/// - root: The root directory to use for storage.
Expand All @@ -43,3 +45,8 @@ pub struct S3StorageConfig {
pub struct LocalStorageConfig {
pub root: String,
}

#[derive(Deserialize, Debug, Clone)]
pub struct AdmissionControlledS3StorageConfig {
pub s3_config: S3StorageConfig,
}
19 changes: 17 additions & 2 deletions rust/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ impl Storage {
}
}

pub async fn get_internal(
// TODO: Remove this once the upstream switches to consume non-streaming.
pub async fn get_stream(
&self,
key: &str,
) -> Result<Box<dyn Stream<Item = ByteStreamItem> + Unpin + Send>, GetError> {
Expand All @@ -116,7 +117,18 @@ impl Storage {
Err(e) => Err(GetError::LocalError(e)),
}
}
_ => unimplemented!(),
Storage::AdmissionControlledS3(admission_controlled_storage) => {
let res = admission_controlled_storage.get_stream(key).await;
match res {
Ok(res) => Ok(res),
Err(e) => match e {
AdmissionControlledS3StorageError::S3GetError(e) => match e {
S3GetError::NoSuchKey(_) => Err(GetError::NoSuchKey(key.to_string())),
_ => Err(GetError::S3Error(e)),
},
},
}
}
}
}

Expand Down Expand Up @@ -161,5 +173,8 @@ pub async fn from_config(config: &StorageConfig) -> Result<Storage, Box<dyn Chro
StorageConfig::Local(_) => Ok(Storage::Local(
local::LocalStorage::try_from_config(config).await?,
)),
StorageConfig::AdmissionControlledS3(_) => Ok(Storage::AdmissionControlledS3(
AdmissionControlledS3Storage::try_from_config(config).await?,
)),
}
}
26 changes: 14 additions & 12 deletions rust/worker/chroma_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ query_service:
connect_timeout_ms: 5000
request_timeout_ms: 5000
storage:
S3:
bucket: "chroma-storage"
credentials: "Minio"
connect_timeout_ms: 5000
request_timeout_ms: 30000 # 1 minute
upload_part_size_bytes: 536870912 # 512MiB
AdmissionControlledS3:
s3_config:
bucket: "chroma-storage"
credentials: "Minio"
connect_timeout_ms: 5000
request_timeout_ms: 30000 # 1 minute
upload_part_size_bytes: 536870912 # 512MiB
log:
Grpc:
host: "logservice.chroma"
Expand Down Expand Up @@ -76,12 +77,13 @@ compaction_service:
connect_timeout_ms: 5000
request_timeout_ms: 5000
storage:
S3:
bucket: "chroma-storage"
credentials: "Minio"
connect_timeout_ms: 5000
request_timeout_ms: 60000 # 1 minute
upload_part_size_bytes: 536870912 # 512MiB
AdmissionControlledS3:
s3_config:
bucket: "chroma-storage"
credentials: "Minio"
connect_timeout_ms: 5000
request_timeout_ms: 60000 # 1 minute
upload_part_size_bytes: 536870912 # 512MiB
log:
Grpc:
host: "logservice.chroma"
Expand Down
91 changes: 49 additions & 42 deletions rust/worker/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,13 @@ mod tests {
connect_timeout_ms: 5000
request_timeout_ms: 1000
storage:
S3:
bucket: "chroma"
credentials: Minio
connect_timeout_ms: 5000
request_timeout_ms: 1000
upload_part_size_bytes: 8388608
AdmissionControlledS3:
s3_config:
bucket: "chroma"
credentials: Minio
connect_timeout_ms: 5000
request_timeout_ms: 1000
upload_part_size_bytes: 8388608
log:
Grpc:
host: "localhost"
Expand Down Expand Up @@ -217,12 +218,13 @@ mod tests {
connect_timeout_ms: 5000
request_timeout_ms: 1000
storage:
S3:
bucket: "chroma"
credentials: Minio
connect_timeout_ms: 5000
request_timeout_ms: 1000
upload_part_size_bytes: 8388608
AdmissionControlledS3:
s3_config:
bucket: "chroma"
credentials: Minio
connect_timeout_ms: 5000
request_timeout_ms: 1000
upload_part_size_bytes: 8388608
log:
Grpc:
host: "localhost"
Expand Down Expand Up @@ -296,12 +298,13 @@ mod tests {
connect_timeout_ms: 5000
request_timeout_ms: 1000
storage:
S3:
bucket: "chroma"
credentials: Minio
connect_timeout_ms: 5000
request_timeout_ms: 1000
upload_part_size_bytes: 8388608
AdmissionControlledS3:
s3_config:
bucket: "chroma"
credentials: Minio
connect_timeout_ms: 5000
request_timeout_ms: 1000
upload_part_size_bytes: 8388608
log:
Grpc:
host: "localhost"
Expand Down Expand Up @@ -349,12 +352,13 @@ mod tests {
connect_timeout_ms: 5000
request_timeout_ms: 1000
storage:
S3:
bucket: "chroma"
credentials: Minio
connect_timeout_ms: 5000
request_timeout_ms: 1000
upload_part_size_bytes: 8388608
AdmissionControlledS3:
s3_config:
bucket: "chroma"
credentials: Minio
connect_timeout_ms: 5000
request_timeout_ms: 1000
upload_part_size_bytes: 8388608
log:
Grpc:
host: "localhost"
Expand Down Expand Up @@ -446,12 +450,13 @@ mod tests {
connect_timeout_ms: 5000
request_timeout_ms: 1000
storage:
S3:
bucket: "chroma"
credentials: Minio
connect_timeout_ms: 5000
request_timeout_ms: 1000
upload_part_size_bytes: 8388608
AdmissionControlledS3:
s3_config:
bucket: "chroma"
credentials: Minio
connect_timeout_ms: 5000
request_timeout_ms: 1000
upload_part_size_bytes: 8388608
log:
Grpc:
host: "localhost"
Expand Down Expand Up @@ -499,12 +504,13 @@ mod tests {
connect_timeout_ms: 5000
request_timeout_ms: 1000
storage:
S3:
bucket: "chroma"
credentials: Minio
connect_timeout_ms: 5000
request_timeout_ms: 1000
upload_part_size_bytes: 8388608
AdmissionControlledS3:
s3_config:
bucket: "chroma"
credentials: Minio
connect_timeout_ms: 5000
request_timeout_ms: 1000
upload_part_size_bytes: 8388608
log:
Grpc:
host: "localhost"
Expand Down Expand Up @@ -594,12 +600,13 @@ mod tests {
connect_timeout_ms: 5000
request_timeout_ms: 1000
storage:
S3:
bucket: "chroma"
credentials: Minio
connect_timeout_ms: 5000
request_timeout_ms: 1000
upload_part_size_bytes: 8388608
AdmissionControlledS3:
s3_config:
bucket: "chroma"
credentials: Minio
connect_timeout_ms: 5000
request_timeout_ms: 1000
upload_part_size_bytes: 8388608
log:
Grpc:
host: "localhost"
Expand Down

0 comments on commit 5737fed

Please sign in to comment.