Skip to content

Commit

Permalink
refactor: Remove never used Stream poll_reset API (#3774)
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored Dec 19, 2023
1 parent 62ca99d commit 41df0a1
Show file tree
Hide file tree
Showing 6 changed files with 0 additions and 144 deletions.
24 changes: 0 additions & 24 deletions core/src/raw/http_util/multipart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,14 +204,6 @@ impl<T: Part> Stream for MultipartStream<T> {

Poll::Ready(None)
}

/// It's possible to implement reset by calling stream's `poll_reset`.
fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
Poll::Ready(Err(Error::new(
ErrorKind::Unsupported,
"MultipartStream doesn't support reset yet",
)))
}
}

/// Part is a trait for multipart part.
Expand Down Expand Up @@ -356,14 +348,6 @@ impl Stream for FormDataPartStream {

Poll::Ready(None)
}

/// It's possible to implement reset by calling stream's `poll_reset`.
fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
Poll::Ready(Err(Error::new(
ErrorKind::Unsupported,
"FormDataPartStream doesn't support reset yet",
)))
}
}

/// MixedPart is a builder for multipart/mixed part.
Expand Down Expand Up @@ -710,14 +694,6 @@ impl Stream for MixedPartStream {

Poll::Ready(None)
}

/// It's possible to implement reset by calling stream's `poll_reset`.
fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
Poll::Ready(Err(Error::new(
ErrorKind::Unsupported,
"MixedPartStream doesn't support reset yet",
)))
}
}

#[cfg(test)]
Expand Down
7 changes: 0 additions & 7 deletions core/src/raw/oio/buf/chunked_bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,13 +328,6 @@ impl oio::Stream for ChunkedBytes {
None => Poll::Ready(None),
}
}

fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
Poll::Ready(Err(Error::new(
ErrorKind::Unsupported,
"ChunkedBytes does not support reset",
)))
}
}

impl Stream for ChunkedBytes {
Expand Down
5 changes: 0 additions & 5 deletions core/src/raw/oio/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,4 @@ impl oio::Stream for Cursor {
self.pos += bs.len() as u64;
Poll::Ready(Some(Ok(bs)))
}

fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
self.pos = 0;
Poll::Ready(Ok(()))
}
}
94 changes: 0 additions & 94 deletions core/src/raw/oio/stream/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::ready;
use std::task::Context;
use std::task::Poll;
Expand All @@ -38,9 +37,6 @@ pub type Streamer = Box<dyn Stream>;
pub trait Stream: Unpin + Send + Sync {
/// Poll next item `Result<Bytes>` from the stream.
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>>;

/// Reset this stream to the beginning.
fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>>;
}

impl Stream for () {
Expand All @@ -49,12 +45,6 @@ impl Stream for () {

unimplemented!("poll_next is required to be implemented for oio::Stream")
}

fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
let _ = cx;

unimplemented!("poll_reset is required to be implemented for oio::Stream")
}
}

/// `Box<dyn Stream>` won't implement `Stream` automatically.
Expand All @@ -63,66 +53,12 @@ impl<T: Stream + ?Sized> Stream for Box<T> {
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> {
(**self).poll_next(cx)
}

fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
(**self).poll_reset(cx)
}
}

impl Stream for dyn raw::oio::Read {
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> {
raw::oio::Read::poll_next(self, cx)
}

fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
let _ = raw::oio::Read::poll_seek(self, cx, std::io::SeekFrom::Start(0))?;

Poll::Ready(Ok(()))
}
}

impl<T: Stream + ?Sized> Stream for Arc<std::sync::Mutex<T>> {
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> {
match self.try_lock() {
Ok(mut this) => this.poll_next(cx),
Err(_) => Poll::Ready(Some(Err(Error::new(
ErrorKind::Unexpected,
"the stream is expected to have only one consumer, but it's not",
)))),
}
}

fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
match self.try_lock() {
Ok(mut this) => this.poll_reset(cx),
Err(_) => Poll::Ready(Err(Error::new(
ErrorKind::Unexpected,
"the stream is expected to have only one consumer, but it's not",
))),
}
}
}

impl<T: Stream + ?Sized> Stream for Arc<tokio::sync::Mutex<T>> {
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> {
match self.try_lock() {
Ok(mut this) => this.poll_next(cx),
Err(_) => Poll::Ready(Some(Err(Error::new(
ErrorKind::Unexpected,
"the stream is expected to have only one consumer, but it's not",
)))),
}
}

fn poll_reset(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
match self.try_lock() {
Ok(mut this) => this.poll_reset(cx),
Err(_) => Poll::Ready(Err(Error::new(
ErrorKind::Unexpected,
"the stream is expected to have only one consumer, but it's not",
))),
}
}
}

impl futures::Stream for dyn Stream {
Expand All @@ -145,11 +81,6 @@ pub trait StreamExt: Stream {
NextFuture { inner: self }
}

/// Build a future for `poll_reset`.
fn reset(&mut self) -> ResetFuture<'_, Self> {
ResetFuture { inner: self }
}

/// Chain this stream with another stream.
fn chain<S>(self, other: S) -> Chain<Self, S>
where
Expand Down Expand Up @@ -192,24 +123,6 @@ where
}
}

/// Make this future `!Unpin` for compatibility with async trait methods.
#[pin_project(!Unpin)]
pub struct ResetFuture<'a, T: Stream + Unpin + ?Sized> {
inner: &'a mut T,
}

impl<T> Future for ResetFuture<'_, T>
where
T: Stream + Unpin + ?Sized,
{
type Output = Result<()>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
let this = self.project();
Pin::new(this.inner).poll_reset(cx)
}
}

/// Stream for the [`chain`](StreamExt::chain) method.
#[must_use = "streams do nothing unless polled"]
pub struct Chain<S1: Stream, S2: Stream> {
Expand All @@ -228,13 +141,6 @@ impl<S1: Stream, S2: Stream> Stream for Chain<S1, S2> {
}
self.second.poll_next(cx)
}

fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
Poll::Ready(Err(Error::new(
ErrorKind::Unsupported,
"chained stream doesn't support reset",
)))
}
}

/// Stream for the [`collect`](StreamExt::collect) method.
Expand Down
7 changes: 0 additions & 7 deletions core/src/raw/oio/stream/into_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,4 @@ where
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes>>> {
self.inner.try_poll_next_unpin(cx)
}

fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
Poll::Ready(Err(Error::new(
ErrorKind::Unsupported,
"IntoStream doesn't support reset",
)))
}
}
7 changes: 0 additions & 7 deletions core/src/raw/oio/stream/into_stream_from_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,4 @@ where
.set_source(err)))),
}
}

fn poll_reset(&mut self, _: &mut Context<'_>) -> Poll<Result<()>> {
Poll::Ready(Err(Error::new(
ErrorKind::Unsupported,
"FromReaderStream doesn't support reset",
)))
}
}

0 comments on commit 41df0a1

Please sign in to comment.