Skip to content

Commit

Permalink
Review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sanketkedia committed Aug 19, 2024
1 parent d82f24a commit f642b5c
Showing 1 changed file with 27 additions and 25 deletions.
52 changes: 27 additions & 25 deletions rust/storage/src/admissioncontrolleds3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,33 +113,21 @@ impl AdmissionControlledS3Storage {
}
}

async fn enter(&self) -> SemaphorePermit<'_> {
match &*self.rate_limiter {
RateLimitPolicy::CountBasedPolicy(policy) => {
return policy.acquire().await;
}
}
}

async fn exit(&self, permit: SemaphorePermit<'_>) {
match &*self.rate_limiter {
RateLimitPolicy::CountBasedPolicy(policy) => {
policy.drop(permit).await;
}
}
}

pub async fn get(
&self,
key: String,
) -> Result<Arc<Vec<u8>>, AdmissionControlledS3StorageError> {
let permit = self.enter().await;
// If there is a duplicate request and the original request finishes
// before we look it up in the map below then we will end up with another
// request to S3. We rely on synchronization on the cache
// by the upstream consumer to make sure that this works correctly.
let future_to_await;
let is_dupe: bool;
{
let mut requests = self.outstanding_requests.lock();
let maybe_inflight = requests.get(&key).map(|fut| fut.clone());
future_to_await = match maybe_inflight {
Some(fut) => fut,
(future_to_await, is_dupe) = match maybe_inflight {
Some(fut) => (fut, true),
None => {
let get_storage_future = AdmissionControlledS3Storage::read_from_storage(
self.storage.clone(),
Expand All @@ -148,18 +136,25 @@ impl AdmissionControlledS3Storage {
.boxed()
.shared();
requests.insert(key.clone(), get_storage_future.clone());
get_storage_future
(get_storage_future, false)
}
};
}

// Acquire permit.
let permit: SemaphorePermit<'_>;
if is_dupe {
permit = self.rate_limiter.enter().await;
}

let res = future_to_await.await;
{
let mut requests = self.outstanding_requests.lock();
requests.remove(&key);
}
// Release permit.
self.exit(permit).await;

res
// Permit gets dropped here since it is RAII.
}

pub async fn put_file(&self, key: &str, path: &str) -> Result<(), S3PutError> {
Expand Down Expand Up @@ -197,6 +192,16 @@ enum RateLimitPolicy {
CountBasedPolicy(CountBasedPolicy),
}

impl RateLimitPolicy {
async fn enter(&self) -> SemaphorePermit<'_> {
match self {
RateLimitPolicy::CountBasedPolicy(policy) => {
return policy.acquire().await;
}
}
}
}

#[derive(Debug)]
struct CountBasedPolicy {
max_allowed_outstanding: usize,
Expand All @@ -219,9 +224,6 @@ impl CountBasedPolicy {
Err(e) => panic!("AcquireToken Failed {}", e),
}
}
async fn drop(&self, permit: SemaphorePermit<'_>) {
drop(permit);
}
}

#[async_trait]
Expand Down

0 comments on commit f642b5c

Please sign in to comment.