Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Sink for Sender<T> #25

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/build-and-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ jobs:

- name: Set current week of the year in environnement
if: startsWith(matrix.os, 'ubuntu') || startsWith(matrix.os, 'macOS')
run: echo "::set-env name=CURRENT_WEEK::$(date +%V)"
run: echo "CURRENT_WEEK=$(date +%V)" >> $GITHUB_ENV

- name: Set current week of the year in environnement
if: startsWith(matrix.os, 'windows')
run: echo "::set-env name=CURRENT_WEEK::$(Get-Date -UFormat %V)"
run: echo "::set-env name=CURRENT_WEEK::$(date +%V)"

- name: Install latest ${{ matrix.rust }}
uses: actions-rs/toolchain@v1
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/lint.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
- uses: actions/checkout@v2

- name: Set current week of the year in environnement
run: echo "::set-env name=CURRENT_WEEK::$(date +%V)"
run: echo "CURRENT_WEEK=$(date +%V)" >> $GITHUB_ENV

- uses: actions-rs/toolchain@v1
with:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/security.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
- uses: actions/checkout@v2

- name: Set current week of the year in environnement
run: echo "::set-env name=CURRENT_WEEK::$(date +%V)"
run: echo "CURRENT_WEEK=$(date +%V)" >> $GITHUB_ENV

- uses: actions-rs/audit-check@v1
with:
Expand Down
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ readme = "README.md"

[dependencies]
concurrent-queue = "1.2.2"
event-listener = "2.4.0"
futures-core = "0.3.5"
event-listener = "2.5.1"
futures-core = "0.3.8"

[dev-dependencies]
blocking = "0.6.0"
easy-parallel = "3.1.0"
futures-lite = "1.11.0"
futures-lite = "1.11.2"
104 changes: 104 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ struct Channel<T> {
/// Stream operations while the channel is empty and not closed.
stream_ops: Event,

/// Sink operations while the channel is empty and not closed.
sink_ops: Event,

/// The number of currently active `Sender`s.
sender_count: AtomicUsize,

Expand Down Expand Up @@ -112,12 +115,15 @@ pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) {
send_ops: Event::new(),
recv_ops: Event::new(),
stream_ops: Event::new(),
sink_ops: Event::new(),
sender_count: AtomicUsize::new(1),
receiver_count: AtomicUsize::new(1),
});

let s = Sender {
channel: channel.clone(),
listener: None,
sending_msg: None,
};
let r = Receiver {
channel,
Expand Down Expand Up @@ -151,12 +157,15 @@ pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
send_ops: Event::new(),
recv_ops: Event::new(),
stream_ops: Event::new(),
sink_ops: Event::new(),
sender_count: AtomicUsize::new(1),
receiver_count: AtomicUsize::new(1),
});

let s = Sender {
channel: channel.clone(),
listener: None,
sending_msg: None,
};
let r = Receiver {
channel,
Expand All @@ -174,6 +183,11 @@ pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
pub struct Sender<T> {
/// Inner channel state.
channel: Arc<Channel<T>>,

/// Listens for a recv or close event to unblock this stream.
listener: Option<EventListener>,

sending_msg: Option<T>,
}

impl<T> Sender<T> {
Expand Down Expand Up @@ -421,6 +435,91 @@ impl<T> Sender<T> {
pub fn sender_count(&self) -> usize {
self.channel.sender_count.load(Ordering::SeqCst)
}

/// Attempts to send a message into the channel.
/// This method takes the message inside the `message` argument and buffer it if the channel is full.
/// This method returns `Pending` if the channel is full and `Ready(SendError<T>)` if it is closed.
/// # Panics
/// Panics if call this method with `None` message in the first call.
/// # Examples
///
/// ```
/// use async_channel::{bounded, SendError};
/// use futures_lite::future;
/// use std::task::Poll;

/// future::block_on(async {
/// future::poll_fn(|cx| -> Poll<()> {
/// let (mut s, r) = bounded::<u32>(1);
/// assert_eq!(s.poll_send(cx, &mut Some(1)), Poll::Ready(Ok(())));
/// assert_eq!(s.poll_send(cx, &mut Some(2)), Poll::Pending);
/// drop(r);
/// assert_eq!(
/// s.poll_send(cx, &mut Some(3)),
/// Poll::Ready(Err(SendError(3)))
/// );
/// Poll::Ready(())
/// })
/// .await;
/// });
/// ```
pub fn poll_send(
&mut self,
cx: &mut Context<'_>,
msg: &mut Option<T>,
) -> Poll<Result<(), SendError<T>>> {
// take() the message when calling this function for the first time.

if let Some(msg) = msg.take() {
self.sending_msg = Some(msg);
}

loop {
// If this sink is listening for events, first wait for a notification.
if let Some(listener) = &mut self.listener {
futures_core::ready!(Pin::new(listener).poll(cx));
self.listener = None;
}

loop {
let message = self.sending_msg.take().unwrap();
// Attempt to send the item immediately
match self.try_send(message) {
Ok(_) => {
// Great! The item has been sent sucessfully.
// The stream is not blocked on an event - drop the listener.
self.listener = None;
return Poll::Ready(Ok(()));
}
Err(e) => match e {
TrySendError::Full(item) => {
// The channel is full now.
// Store the item back to the struct for the next loop or polling.
self.sending_msg = Some(item);
}
TrySendError::Closed(item) => {
// The channel is closed.
// The stream is not blocked on an event - drop the listener.
self.listener = None;
return Poll::Ready(Err(SendError(item)));
}
},
}

// Receiving failed - now start listening for notifications or wait for one.
match &mut self.listener {
Some(_) => {
// Create a listener and try sending the message again.
break;
}
None => {
// Go back to the outer loop to poll the listener.
self.listener = Some(self.channel.sink_ops.listen());
}
}
}
}
}
}

impl<T> Drop for Sender<T> {
Expand Down Expand Up @@ -449,6 +548,8 @@ impl<T> Clone for Sender<T> {

Sender {
channel: self.channel.clone(),
listener: None,
sending_msg: None,
}
}
}
Expand Down Expand Up @@ -497,6 +598,8 @@ impl<T> Receiver<T> {
// message or gets canceled, it will notify another blocked send operation.
self.channel.send_ops.notify(1);

self.channel.sink_ops.notify(usize::MAX);

Ok(msg)
}
Err(PopError::Empty) => Err(TryRecvError::Empty),
Expand Down Expand Up @@ -725,6 +828,7 @@ impl<T> Drop for Receiver<T> {
if self.channel.receiver_count.fetch_sub(1, Ordering::AcqRel) == 1 {
self.channel.close();
}
self.channel.sink_ops.notify(usize::MAX);
}
}

Expand Down
105 changes: 105 additions & 0 deletions tests/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,3 +383,108 @@ fn mpmc_stream() {
assert_eq!(c.load(Ordering::SeqCst), THREADS);
}
}

#[test]
fn poll_send() {
let (mut s, r) = bounded::<u32>(1);

Parallel::new()
.add(|| {
future::block_on(async {
future::poll_fn(|cx| s.poll_send(cx, &mut Some(7u32)))
.await
.unwrap();
});
sleep(ms(1000));
future::block_on(async {
future::poll_fn(|cx| s.poll_send(cx, &mut Some(8u32)))
.await
.unwrap();
});
sleep(ms(1000));
future::block_on(async {
future::poll_fn(|cx| s.poll_send(cx, &mut Some(9u32)))
.await
.unwrap();
});
sleep(ms(1000));
future::block_on(async {
future::poll_fn(|cx| s.poll_send(cx, &mut Some(10u32)))
.await
.unwrap();
});
})
.add(|| {
sleep(ms(1500));
assert_eq!(future::block_on(r.recv()), Ok(7));
assert_eq!(future::block_on(r.recv()), Ok(8));
assert_eq!(future::block_on(r.recv()), Ok(9));
})
.run();
}

#[test]
fn spsc_poll_send() {
const COUNT: usize = 25_000;

let (s, r) = bounded::<usize>(3);

Parallel::new()
.add({
let mut r = r.clone();
move || {
for _ in 0..COUNT {
future::block_on(r.next()).unwrap();
}
}
})
.add(|| {
let s = s.clone();
for i in 0..COUNT {
let mut s = s.clone();
future::block_on(async {
future::poll_fn(|cx| s.poll_send(cx, &mut Some(i)))
.await
.unwrap();
});
}
})
.run();
}

#[test]
fn mpmc_poll_send() {
const COUNT: usize = 25_000;
const THREADS: usize = 4;

let (s, r) = bounded::<usize>(3);
let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
let v = &v;

Parallel::new()
.each(0..THREADS, {
let mut r = r.clone();
move |_| {
for _ in 0..COUNT {
let n = future::block_on(r.next()).unwrap();
v[n].fetch_add(1, Ordering::SeqCst);
}
}
})
.each(0..THREADS, |_| {
let s = s.clone();
for i in 0..COUNT {
let mut s = s.clone();
future::block_on(async {
future::poll_fn(|cx| s.poll_send(cx, &mut Some(i)))
.await
.unwrap();
});
}
})
.run();

for c in v {
assert_eq!(c.load(Ordering::SeqCst), THREADS);
}
}
Loading