Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tokio-stream: add wrapper for broadcast and watch #3384

Merged
merged 19 commits into from
Feb 5, 2021

Conversation

liufuyang
Copy link
Contributor

Signed-off-by: Fuyang Liu [email protected]

Motivation

Closes #3382

Solution

The basic sketch is going to be:

  • Make the constructor takes a broadcast receiver
  • Then it uses async-stream to turn that into an impl Stream
  • Still inside the constructor, call Box::pin on that stream, the put the pinned box into the struct
  • Then your impl Stream on Wrapper just forwards calls to poll_next to the pinned box

@taiki-e taiki-e added A-tokio-stream Area: The tokio-stream crate M-stream Module: tokio/stream labels Jan 10, 2021
@Darksonn
Copy link
Contributor

Darksonn commented Jan 21, 2021

The Unpin bound can be removed once tokio-rs/async-stream#52 is released.

As for the Send/Sync bounds, I believe it would be sound to take them away with the right unsafe, but the question is if it's a good idea.

@liufuyang
Copy link
Contributor Author

That sounds like some difficult question for me to answer :)

tokio-stream/src/wrappers/broadcast.rs Outdated Show resolved Hide resolved
tokio-stream/src/wrappers/broadcast.rs Outdated Show resolved Hide resolved
tokio-stream/src/wrappers/broadcast.rs Outdated Show resolved Hide resolved
tokio-stream/src/wrappers/broadcast.rs Outdated Show resolved Hide resolved
tokio-stream/src/wrappers/broadcast.rs Outdated Show resolved Hide resolved
@liufuyang liufuyang force-pushed the tokio-stream-3382 branch 2 times, most recently from 354649a to f1c0143 Compare January 24, 2021 10:08
@liufuyang liufuyang changed the title tokio-stream: add wrapper for broadcast tokio-stream: add wrapper for broadcast and watch Jan 25, 2021
Comment on lines 13 to 24
pub struct WatchStream<T> {
inner: Pin<Box<dyn Stream<Item = ()>>>,
_marker: std::marker::PhantomData<T>,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like a problem to have the Stream return (). The user of the Stream would want the actual item, though I think we might have to clone it to do that?

Copy link
Contributor Author

@liufuyang liufuyang Jan 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 I thought users would mostly use this to see whether something got arrived? (Maybe I am wrong...) Or maybe it is nice to have it return the last arrived value as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or can I justyield rx.borrow() ? Instead of yield a ()? I am trying this and now having some difficulty to deal with the life-times.

tokio-stream/Cargo.toml Outdated Show resolved Hide resolved
Ok((item, rx))
}

impl<T: Clone + Unpin + 'static + Send + Sync> BroadcastStream<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the new change, some of these bounds can be removed.

/// [`tokio::sync::broadcast::Receiver`]: struct@tokio::sync::broadcast::Receiver
/// [`Stream`]: trait@crate::Stream
pub struct BroadcastStream<T> {
inner: ReusableBoxFuture<Result<(T, Receiver<T>), RecvError>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to return the receiver even if it emits an error, as the stream would otherwise not be usable again after returning an error.

Comment on lines 30 to 43
enum WrappedRecvError<T> {
Lagged(u64, Receiver<T>),
Closed,
}

async fn make_future<T: Clone>(
mut rx: Receiver<T>,
) -> Result<(T, Receiver<T>), WrappedRecvError<T>> {
match rx.recv().await {
Ok(item) => Ok((item, rx)),
Err(RecvError::Lagged(n)) => Err(WrappedRecvError::Lagged(n, rx)),
Err(RecvError::Closed) => Err(WrappedRecvError::Closed),
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to simplify to the following.

Suggested change
enum WrappedRecvError<T> {
Lagged(u64, Receiver<T>),
Closed,
}
async fn make_future<T: Clone>(
mut rx: Receiver<T>,
) -> Result<(T, Receiver<T>), WrappedRecvError<T>> {
match rx.recv().await {
Ok(item) => Ok((item, rx)),
Err(RecvError::Lagged(n)) => Err(WrappedRecvError::Lagged(n, rx)),
Err(RecvError::Closed) => Err(WrappedRecvError::Closed),
}
}
async fn make_future<T: Clone>(
mut rx: Receiver<T>,
) -> (Result<T, RecvError<T>>, Receiver<T>) {
let result = rx.recv().await;
(result, rx)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That looks much better 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That looks much better 😄

Comment on lines 57 to 69
match ready!(self.inner.poll(cx)) {
Ok((item, rx)) => {
self.inner.set(make_future(rx));
Poll::Ready(Some(Ok(item)))
}
Err(err) => match err {
WrappedRecvError::Closed => Poll::Ready(None),
WrappedRecvError::Lagged(n, rx) => {
self.inner.set(make_future(rx));
Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(n))))
}
},
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to call self.inner.set(make_future(rx)) in all three cases because then, if the user calls poll_next after WrappedRecvError::Closed, then we will just return Poll::Ready(None) again.

With this implementation, it will panic if polled again after the first WrappedRecvError::Closed.

You should be able to avoid duplicating the call three times by doing it before the match (but after the self.inner.poll call).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good catch. Thank you 👍

Comment on lines 48 to 54
match result {
Ok(item) => Poll::Ready(Some(Ok(item))),
Err(err) => match err {
RecvError::Closed => Poll::Ready(None),
RecvError::Lagged(n) => Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(n)))),
},
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be simplified.

Suggested change
match result {
Ok(item) => Poll::Ready(Some(Ok(item))),
Err(err) => match err {
RecvError::Closed => Poll::Ready(None),
RecvError::Lagged(n) => Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(n)))),
},
}
match result {
Ok(item) => Poll::Ready(Some(Ok(item))),
Err(RecvError::Closed) => Poll::Ready(None),
Err(RecvError::Lagged(n)) => Poll::Ready(Some(Err(BroadcastStreamRecvError::Lagged(n)))),
}

}
}

impl<T: Clone> fmt::Debug for BroadcastStream<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
impl<T: Clone> fmt::Debug for BroadcastStream<T> {
impl<T> fmt::Debug for BroadcastStream<T> {

Comment on lines 19 to 24
async fn make_future<T: Clone + Send + Sync>(
mut rx: Receiver<T>,
) -> Result<((), Receiver<T>), RecvError> {
let signal = rx.changed().await?;
Ok((signal, rx))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same change here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha, sorry I forgot this one.

Copy link
Contributor

@Darksonn Darksonn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems good to me now!

@Darksonn Darksonn merged commit 1c1e0e3 into tokio-rs:master Feb 5, 2021
@liufuyang liufuyang deleted the tokio-stream-3382 branch February 6, 2021 09:59
bors bot pushed a commit to sigp/lighthouse that referenced this pull request Feb 10, 2021
## Issue Addressed

resolves #2129
resolves #2099 
addresses some of #1712
unblocks #2076
unblocks #2153 

## Proposed Changes

- Updates all the dependencies mentioned in #2129, except for web3. They haven't merged their tokio 1.0 update because they are waiting on some dependencies of their own. Since we only use web3 in tests, I think updating it in a separate issue is fine. If they are able to merge soon though, I can update in this PR. 

- Updates `tokio_util` to 0.6.2 and `bytes` to 1.0.1.

- We haven't made a discv5 release since merging tokio 1.0 updates so I'm using a commit rather than release atm. **Edit:** I think we should merge an update of `tokio_util` to 0.6.2 into discv5 before this release because it has panic fixes in `DelayQueue`  --> PR in discv5:  sigp/discv5#58

## Additional Info

tokio 1.0 changes that required some changes in lighthouse:

- `interval.next().await.is_some()` -> `interval.tick().await`
- `sleep` future is now `!Unpin` -> tokio-rs/tokio#3028
- `try_recv` has been temporarily removed from `mpsc` -> tokio-rs/tokio#3350
- stream features have moved to `tokio-stream` and `broadcast::Receiver::into_stream()` has been temporarily removed -> `tokio-rs/tokio#2870
- I've copied over the `BroadcastStream` wrapper from this PR, but can update to use `tokio-stream` once it's merged tokio-rs/tokio#3384

Co-authored-by: realbigsean <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio-stream Area: The tokio-stream crate M-stream Module: tokio/stream
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add tokio-stream wrappers for watch and broadcast receivers
3 participants