diff --git a/src/types/operator/operator.rs b/src/types/operator/operator.rs index 9b700dd8bba8..be757d50a837 100644 --- a/src/types/operator/operator.rs +++ b/src/types/operator/operator.rs @@ -686,7 +686,7 @@ impl Operator { /// # Ok(()) /// # } /// ``` - pub async fn remove_via(&self, mut input: impl Stream + Unpin) -> Result<()> { + pub async fn remove_via(&self, input: impl Stream + Unpin) -> Result<()> { if self.info().can_batch() { let mut input = input.map(|v| (v, OpDelete::default())).chunks(self.limit()); @@ -704,9 +704,13 @@ impl Operator { } } } else { - while let Some(path) = input.next().await { - self.inner().delete(&path, OpDelete::default()).await?; - } + input + .map(Ok) + .try_for_each_concurrent(self.limit, |path| async move { + let _ = self.inner().delete(&path, OpDelete::default()).await?; + Ok::<(), Error>(()) + }) + .await?; } Ok(())