From 1e439bf80d0d6cf76994946b34eb2a99c2ba4bb6 Mon Sep 17 00:00:00 2001 From: Wenyu Huang Date: Tue, 7 Mar 2023 15:48:15 +0800 Subject: [PATCH] feat: remove_via can delete files concurrently Signed-off-by: Wenyu Huang --- .github/workflows/service_test_memcached.yml | 2 +- src/services/memcached/ascii.rs | 14 +++++++++++++- src/types/operator/operator.rs | 12 ++++++++---- 3 files changed, 22 insertions(+), 6 deletions(-) diff --git a/.github/workflows/service_test_memcached.yml b/.github/workflows/service_test_memcached.yml index 1f12255d4ce..c8f198ed8d6 100644 --- a/.github/workflows/service_test_memcached.yml +++ b/.github/workflows/service_test_memcached.yml @@ -39,7 +39,7 @@ jobs: # Setup memcached server services: memcached: - image: bitnami/memcached + image: bitnami/memcached:1.6.14 env: # memcache's max item size is 1MiB, But opendal's behavior tests # will produce larger file. diff --git a/src/services/memcached/ascii.rs b/src/services/memcached/ascii.rs index 41d4f8e938c..6d7a9280ce5 100644 --- a/src/services/memcached/ascii.rs +++ b/src/services/memcached/ascii.rs @@ -87,9 +87,21 @@ where /// Delete a key and don't wait for response. pub async fn delete(&mut self, key: K) -> Result<(), Error> { - let header = format!("delete {} noreply\r\n", key); + let header = format!("delete {}\r\n", key); self.io.write_all(header.as_bytes()).await?; self.io.flush().await?; + + // Read response header + let header = self.read_line().await?; + let header = std::str::from_utf8(header).map_err(|_| ErrorKind::InvalidData)?; + // Check response header and parse value length + if header.contains("NOT_FOUND"){ + return Ok(()); + } else if header.starts_with("END") { + return Err(ErrorKind::NotFound.into()); + } else if header.contains("ERROR") || !header.contains("DELETED") { + return Err(Error::new(ErrorKind::Other, header)); + } Ok(()) } diff --git a/src/types/operator/operator.rs b/src/types/operator/operator.rs index ddfa34fc59d..83934356f2a 100644 --- a/src/types/operator/operator.rs +++ b/src/types/operator/operator.rs @@ -677,7 +677,7 @@ impl Operator { /// # Ok(()) /// # } /// ``` - pub async fn remove_via(&self, mut input: impl Stream + Unpin) -> Result<()> { + pub async fn remove_via(&self, input: impl Stream + Unpin) -> Result<()> { if self.info().can_batch() { let mut input = input.map(|v| (v, OpDelete::default())).chunks(self.limit()); @@ -695,9 +695,13 @@ impl Operator { } } } else { - while let Some(path) = input.next().await { - self.inner().delete(&path, OpDelete::default()).await?; - } + input + .map(Ok) + .try_for_each_concurrent(self.limit, |path| async move { + let _ = self.inner().delete(&path, OpDelete::default()).await?; + Ok::<(), Error>(()) + }) + .await?; } Ok(())