Skip to content

Commit

Permalink
sync: fix watch wrapper (#3914)
Browse files Browse the repository at this point in the history
  • Loading branch information
Darksonn committed Jul 2, 2021
1 parent 677107d commit 8170e27
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 3 deletions.
2 changes: 1 addition & 1 deletion tokio-stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ signal = ["tokio/signal"]
[dependencies]
futures-core = { version = "0.3.0" }
pin-project-lite = "0.2.0"
tokio = { version = "1.2.0", path = "../tokio", features = ["sync"] }
tokio = { version = "1.8.0", path = "../tokio", features = ["sync"] }
tokio-util = { version = "0.6.3", path = "../tokio-util", optional = true }

[dev-dependencies]
Expand Down
4 changes: 2 additions & 2 deletions tokio-stream/src/wrappers/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ impl<T: Clone + 'static + Send + Sync> Stream for WatchStream<T> {
type Item = T;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let (result, rx) = ready!(self.inner.poll(cx));
let (result, mut rx) = ready!(self.inner.poll(cx));
match result {
Ok(_) => {
let received = (*rx.borrow()).clone();
let received = (*rx.borrow_and_update()).clone();
self.inner.set(make_future(rx));
Poll::Ready(Some(received))
}
Expand Down
27 changes: 27 additions & 0 deletions tokio-stream/tests/watch.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use tokio::sync::watch;
use tokio_stream::wrappers::WatchStream;
use tokio_stream::StreamExt;

#[tokio::test]
async fn message_not_twice() {
let (tx, rx) = watch::channel("hello");

let mut counter = 0;
let mut stream = WatchStream::new(rx).map(move |payload| {
println!("{}", payload);
if payload == "goodbye" {
counter += 1;
}
if counter >= 2 {
panic!("too many goodbyes");
}
});

let task = tokio::spawn(async move { while stream.next().await.is_some() {} });

// Send goodbye just once
tx.send("goodbye").unwrap();

drop(tx);
task.await.unwrap();
}

0 comments on commit 8170e27

Please sign in to comment.