Skip to content

Commit

Permalink
Rate limit for NAC
Browse files Browse the repository at this point in the history
  • Loading branch information
sanketkedia committed Aug 9, 2024
1 parent d7d993b commit 57759a6
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 7 deletions.
92 changes: 85 additions & 7 deletions rust/storage/src/admissioncontrolleds3.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::GetError;
use crate::{
config::StorageConfig,
config::{CountBasedPolicyConfig, RateLimitingConfig, StorageConfig},
s3::{S3GetError, S3PutError, S3Storage, StorageConfigError},
stream::ByteStreamItem,
};
Expand All @@ -11,6 +11,7 @@ use futures::{future::Shared, FutureExt, Stream, StreamExt};
use parking_lot::Mutex;
use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc};
use thiserror::Error;
use tokio::sync::{Semaphore, SemaphorePermit};
use tracing::{Instrument, Span};

// Wrapper over s3 storage that provides proxy features such as
Expand Down Expand Up @@ -38,6 +39,7 @@ pub struct AdmissionControlledS3Storage {
>,
>,
>,
rate_limiter: Arc<RateLimitPolicy>,
}

#[derive(Error, Debug, Clone)]
Expand All @@ -55,10 +57,19 @@ impl ChromaError for AdmissionControlledS3StorageError {
}

impl AdmissionControlledS3Storage {
pub fn new(storage: S3Storage) -> Self {
pub fn new_with_default_policy(storage: S3Storage) -> Self {
Self {
storage,
outstanding_requests: Arc::new(Mutex::new(HashMap::new())),
rate_limiter: Arc::new(RateLimitPolicy::CountBasedPolicy(CountBasedPolicy::new(15))),
}
}

pub fn new(storage: S3Storage, policy: RateLimitPolicy) -> Self {
Self {
storage,
outstanding_requests: Arc::new(Mutex::new(HashMap::new())),
rate_limiter: Arc::new(policy),
}
}

Expand Down Expand Up @@ -143,10 +154,27 @@ impl AdmissionControlledS3Storage {
}
}

async fn enter(&self) -> SemaphorePermit<'_> {
match &*self.rate_limiter {
RateLimitPolicy::CountBasedPolicy(policy) => {
return policy.acquire().await;
}
}
}

async fn exit(&self, permit: SemaphorePermit<'_>) {
match &*self.rate_limiter {
RateLimitPolicy::CountBasedPolicy(policy) => {
policy.drop(permit).await;
}
}
}

pub async fn get(
&self,
key: String,
) -> Result<Arc<Vec<u8>>, AdmissionControlledS3StorageError> {
let permit = self.enter().await;
let future_to_await;
{
let mut requests = self.outstanding_requests.lock();
Expand All @@ -170,6 +198,8 @@ impl AdmissionControlledS3Storage {
let mut requests = self.outstanding_requests.lock();
requests.remove(&key);
}
// Release permit.
self.exit(permit).await;
res
}

Expand All @@ -187,15 +217,63 @@ impl Configurable<StorageConfig> for AdmissionControlledS3Storage {
async fn try_from_config(config: &StorageConfig) -> Result<Self, Box<dyn ChromaError>> {
match &config {
StorageConfig::AdmissionControlledS3(nacconfig) => {
let s3_storage = S3Storage::try_from_config(&StorageConfig::S3(
nacconfig.s3_config.clone(),
))
.await?;
return Ok(Self::new(s3_storage));
let s3_storage =
S3Storage::try_from_config(&StorageConfig::S3(nacconfig.s3_config.clone()))
.await?;
let policy =
RateLimitPolicy::try_from_config(&nacconfig.rate_limiting_policy).await?;
return Ok(Self::new(s3_storage, policy));
}
_ => {
return Err(Box::new(StorageConfigError::InvalidStorageConfig));
}
}
}
}

// Prefer enum dispatch over dyn since there could
// only be a handful of these policies.
#[derive(Debug)]
enum RateLimitPolicy {
CountBasedPolicy(CountBasedPolicy),
}

#[derive(Debug)]
struct CountBasedPolicy {
max_allowed_outstanding: usize,
remaining_tokens: Semaphore,
}

impl CountBasedPolicy {
fn new(max_allowed_outstanding: usize) -> Self {
Self {
max_allowed_outstanding,
remaining_tokens: Semaphore::new(max_allowed_outstanding),
}
}
async fn acquire(&self) -> SemaphorePermit<'_> {
let token_res = self.remaining_tokens.acquire().await;
match token_res {
Ok(token) => {
return token;
}
Err(e) => panic!("AcquireToken Failed {}", e),
}
}
async fn drop(&self, permit: SemaphorePermit<'_>) {
drop(permit);
}
}

#[async_trait]
impl Configurable<RateLimitingConfig> for RateLimitPolicy {
async fn try_from_config(config: &RateLimitingConfig) -> Result<Self, Box<dyn ChromaError>> {
match &config {
RateLimitingConfig::CountBasedPolicy(count_policy) => {
return Ok(RateLimitPolicy::CountBasedPolicy(CountBasedPolicy::new(
count_policy.max_concurrent_requests,
)));
}
}
}
}
11 changes: 11 additions & 0 deletions rust/storage/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,15 @@ pub struct LocalStorageConfig {
#[derive(Deserialize, Debug, Clone)]
pub struct AdmissionControlledS3StorageConfig {
pub s3_config: S3StorageConfig,
pub rate_limiting_policy: RateLimitingConfig,
}

#[derive(Deserialize, Debug, Clone)]
pub struct CountBasedPolicyConfig {
pub max_concurrent_requests: usize,
}

#[derive(Deserialize, Debug, Clone)]
pub enum RateLimitingConfig {
CountBasedPolicy(CountBasedPolicyConfig),
}
6 changes: 6 additions & 0 deletions rust/worker/chroma_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ query_service:
connect_timeout_ms: 5000
request_timeout_ms: 30000 # 1 minute
upload_part_size_bytes: 536870912 # 512MiB
rate_limiting_policy:
CountBasedPolicy:
max_concurrent_requests: 15
log:
Grpc:
host: "logservice.chroma"
Expand Down Expand Up @@ -84,6 +87,9 @@ compaction_service:
connect_timeout_ms: 5000
request_timeout_ms: 60000 # 1 minute
upload_part_size_bytes: 536870912 # 512MiB
rate_limiting_policy:
CountBasedPolicy:
max_concurrent_requests: 15
log:
Grpc:
host: "logservice.chroma"
Expand Down
21 changes: 21 additions & 0 deletions rust/worker/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ mod tests {
connect_timeout_ms: 5000
request_timeout_ms: 1000
upload_part_size_bytes: 8388608
rate_limiting_policy:
CountBasedPolicy:
max_concurrent_requests: 15
log:
Grpc:
host: "localhost"
Expand Down Expand Up @@ -225,6 +228,9 @@ mod tests {
connect_timeout_ms: 5000
request_timeout_ms: 1000
upload_part_size_bytes: 8388608
rate_limiting_policy:
CountBasedPolicy:
max_concurrent_requests: 15
log:
Grpc:
host: "localhost"
Expand Down Expand Up @@ -305,6 +311,9 @@ mod tests {
connect_timeout_ms: 5000
request_timeout_ms: 1000
upload_part_size_bytes: 8388608
rate_limiting_policy:
CountBasedPolicy:
max_concurrent_requests: 15
log:
Grpc:
host: "localhost"
Expand Down Expand Up @@ -359,6 +368,9 @@ mod tests {
connect_timeout_ms: 5000
request_timeout_ms: 1000
upload_part_size_bytes: 8388608
rate_limiting_policy:
CountBasedPolicy:
max_concurrent_requests: 15
log:
Grpc:
host: "localhost"
Expand Down Expand Up @@ -457,6 +469,9 @@ mod tests {
connect_timeout_ms: 5000
request_timeout_ms: 1000
upload_part_size_bytes: 8388608
rate_limiting_policy:
CountBasedPolicy:
max_concurrent_requests: 15
log:
Grpc:
host: "localhost"
Expand Down Expand Up @@ -511,6 +526,9 @@ mod tests {
connect_timeout_ms: 5000
request_timeout_ms: 1000
upload_part_size_bytes: 8388608
rate_limiting_policy:
CountBasedPolicy:
max_concurrent_requests: 15
log:
Grpc:
host: "localhost"
Expand Down Expand Up @@ -607,6 +625,9 @@ mod tests {
connect_timeout_ms: 5000
request_timeout_ms: 1000
upload_part_size_bytes: 8388608
rate_limiting_policy:
CountBasedPolicy:
max_concurrent_requests: 15
log:
Grpc:
host: "localhost"
Expand Down

0 comments on commit 57759a6

Please sign in to comment.