Skip to content

Commit

Permalink
feat: remove_via can delete files concurrently
Browse files Browse the repository at this point in the history
Signed-off-by: Wenyu Huang <[email protected]>
  • Loading branch information
uran0sH committed Mar 10, 2023
1 parent f4cdc2b commit 1e439bf
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 6 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/service_test_memcached.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
# Setup memcached server
services:
memcached:
image: bitnami/memcached
image: bitnami/memcached:1.6.14
env:
# memcache's max item size is 1MiB, But opendal's behavior tests
# will produce larger file.
Expand Down
14 changes: 13 additions & 1 deletion src/services/memcached/ascii.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,21 @@ where

/// Delete a key and don't wait for response.
pub async fn delete<K: Display>(&mut self, key: K) -> Result<(), Error> {
let header = format!("delete {} noreply\r\n", key);
let header = format!("delete {}\r\n", key);
self.io.write_all(header.as_bytes()).await?;
self.io.flush().await?;

// Read response header
let header = self.read_line().await?;
let header = std::str::from_utf8(header).map_err(|_| ErrorKind::InvalidData)?;
// Check response header and parse value length
if header.contains("NOT_FOUND"){
return Ok(());
} else if header.starts_with("END") {
return Err(ErrorKind::NotFound.into());
} else if header.contains("ERROR") || !header.contains("DELETED") {
return Err(Error::new(ErrorKind::Other, header));
}
Ok(())
}

Expand Down
12 changes: 8 additions & 4 deletions src/types/operator/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ impl Operator {
/// # Ok(())
/// # }
/// ```
pub async fn remove_via(&self, mut input: impl Stream<Item = String> + Unpin) -> Result<()> {
pub async fn remove_via(&self, input: impl Stream<Item = String> + Unpin) -> Result<()> {
if self.info().can_batch() {
let mut input = input.map(|v| (v, OpDelete::default())).chunks(self.limit());

Expand All @@ -695,9 +695,13 @@ impl Operator {
}
}
} else {
while let Some(path) = input.next().await {
self.inner().delete(&path, OpDelete::default()).await?;
}
input
.map(Ok)
.try_for_each_concurrent(self.limit, |path| async move {
let _ = self.inner().delete(&path, OpDelete::default()).await?;
Ok::<(), Error>(())
})
.await?;
}

Ok(())
Expand Down

0 comments on commit 1e439bf

Please sign in to comment.