Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem with tokio::sync::watch channel behavior #3643

Closed
reza-ebrahimi opened this issue Mar 25, 2021 · 5 comments
Closed

Problem with tokio::sync::watch channel behavior #3643

reza-ebrahimi opened this issue Mar 25, 2021 · 5 comments
Labels
A-tokio Area: The main tokio crate C-bug Category: This is a bug. M-sync Module: tokio/sync

Comments

@reza-ebrahimi
Copy link

reza-ebrahimi commented Mar 25, 2021

Version

[dependencies]
futures = "0.3"
tokio = { version = "1.1.0", features = ["full"] }
$ cargo tree | grep tokio
└── tokio v1.4.0
    └── tokio-macros v1.1.0 (proc-macro)

Platform

$ uname -a
Linux dev 5.4.0-67-generic #75~18.04.1-Ubuntu SMP Tue Feb 23 19:17:50 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux

Description

Watch channel's receiver half cloned and polled inside a Stream. but it not working as expected.
The expected behavior is like using the normal receiver channel.

use futures::prelude::*;
use std::result::Result;
use std::task::{Context, Poll};
use std::{future::Future, pin::Pin};
use tokio::sync::watch;
use tokio::sync::watch::error::RecvError;
use tokio::time::{sleep, Duration};

#[derive(Clone)]
struct Subscriber {
    pub receiver: watch::Receiver<String>,
}

impl Stream for Subscriber {
    type Item = Result<String, RecvError>;

    fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let is_ready = {
            let mut fut: Pin<Box<dyn Future<Output = Result<(), RecvError>>>> =
                Box::pin(self.receiver.changed());
            Future::poll(fut.as_mut(), ctx).is_ready()
        };

        if is_ready {
            return Poll::Ready(Some(Result::Ok((*self.receiver.borrow()).clone())));
        }

        // With this line, the stream will be called repeatedly and
        // last channel value will be printed into the console.
        // Without this line, the stream never be called more than once,
        // and channel's last value will be received in random.
        ctx.waker().wake_by_ref();

        Poll::Pending
    }
}

#[tokio::main]
async fn main() {
    let (tx, rx) = watch::channel("init".to_string());

    tokio::spawn(async move {
        let mut stream = Box::new(Subscriber {
            receiver: rx.clone(),
        })
        .map(|payload| {
            println!("received rx = {:?}", payload);
        });

        loop {
            stream.next().await;
        }
    });

    tokio::spawn(async move {
        tx.send("Hello".to_string()).unwrap();
        sleep(Duration::from_millis(5)).await; // this sleep needs here to not overwrite the "World" quickly
        tx.send("World".to_string()).unwrap();
    });

    sleep(Duration::from_millis(2000)).await;
}
@reza-ebrahimi reza-ebrahimi added A-tokio Area: The main tokio crate C-bug Category: This is a bug. labels Mar 25, 2021
@Darksonn Darksonn added the M-sync Module: tokio/sync label Mar 25, 2021
@Darksonn
Copy link
Contributor

This is because, by dropping the future returned by self.receiver.changed() when it goes out of scope, you are no longer guaranteed to receive wakeups. To use a watch channel in a poll method, use WatchStream instead.

@reza-ebrahimi
Copy link
Author

@Darksonn Thank you so much! Resolved.

use tokio::sync::watch;
use tokio_stream::{wrappers::WatchStream, StreamExt};

#[tokio::main]
async fn main() {
    let (tx, rx) = watch::channel("hello");
    let mut stream = WatchStream::new(rx).map(|payload| {
        println!("{}", payload);
    });

    stream.next().await;
    tx.send("goodbye").unwrap();
    stream.next().await;
}

@reza-ebrahimi
Copy link
Author

@Darksonn awaiting stream.next().await in loop will prints last channel value two times while just sending goodbye one time.

#[tokio::main]
async fn main() {
    let (tx, rx) = watch::channel("hello");

    let mut stream = WatchStream::new(rx).map(|payload| {
        println!("{}", payload);
    });

    let task = tokio::spawn(async move {
        loop {
          stream.next().await;
        }
    });

    tx.send("goodbye").unwrap();

    task.await.ok();
}

output:

goodbye
goodbye

@Darksonn
Copy link
Contributor

Ah, there's a good reason for that to happen, but it would be nice to fix. Can you open a new bug-report with just that?

@reza-ebrahimi
Copy link
Author

Sure, #3655.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-bug Category: This is a bug. M-sync Module: tokio/sync
Projects
None yet
Development

No branches or pull requests

2 participants