Skip to content

Commit

Permalink
Add rt channel Receive Future
Browse files Browse the repository at this point in the history
This change the previous async function to always first check channel
for message, before waiting MsgListener for another signal.
  • Loading branch information
Thomasdezeeuw committed Aug 11, 2023
1 parent 9cabef8 commit 3bea44a
Showing 1 changed file with 30 additions and 18 deletions.
48 changes: 30 additions & 18 deletions rt/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
// TODO: remove `rt::channel` entirely and replace it with an `ActorRef` to the
// `worker::comm_actor`.

use std::future::poll_fn;
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::sync::mpsc;
use std::task::Poll;
use std::task::{self, Poll};

use a10::msg::{MsgListener, MsgToken};

Expand Down Expand Up @@ -55,24 +55,36 @@ pub(crate) struct Receiver<T> {

impl<T> Receiver<T> {
/// Receive a message from the channel.
pub(crate) async fn recv(&mut self) -> Option<T> {
pub(crate) fn recv<'r>(&'r mut self) -> Receive<'r, T> {
Receive { receiver: self }
}
}

/// [`Future`] behind [`Receiver::recv`].
pub(crate) struct Receive<'r, T> {
receiver: &'r mut Receiver<T>,
}

impl<'r, T> Future for Receive<'r, T> {
type Output = Option<T>;

fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<Self::Output> {
let receiver = &mut *self.receiver;
loop {
poll_fn(|ctx| {
no_ring_ctx!(ctx);
match Pin::new(&mut self.listener).poll_next(ctx) {
Poll::Ready(data) => {
debug_assert_eq!(data, Some(WAKE));
Poll::Ready(())
}
Poll::Pending => Poll::Pending,
}
})
.await;
// Check if we have a message first.
if let Ok(msg) = receiver.receiver.try_recv() {
return Poll::Ready(Some(msg));
}

match self.receiver.try_recv() {
Ok(msg) => return Some(msg),
Err(mpsc::TryRecvError::Empty) => continue,
Err(mpsc::TryRecvError::Disconnected) => return None,
// If not wait until we get a signal that another message is
// available.
no_ring_ctx!(ctx);
match Pin::new(&mut receiver.listener).poll_next(ctx) {
Poll::Ready(data) => {
debug_assert_eq!(data, Some(WAKE));
continue;
}
Poll::Pending => return Poll::Pending,
}
}
}
Expand Down

0 comments on commit 3bea44a

Please sign in to comment.