Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce biased argument for select! #3603

Merged
merged 7 commits into from
Mar 12, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 93 additions & 26 deletions tokio/src/macros/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,24 @@
///
/// ### Fairness
///
/// `select!` randomly picks a branch to check first. This provides some level
/// of fairness when calling `select!` in a loop with branches that are always
/// By default, `select!` randomly picks a branch to check first. This provides
/// some level of fairness when calling `select!` in a loop with branches that
/// are always ready.
///
/// This behavior can be overridden by adding `biased;` to the beginning of the
/// macro usage. See the exmples for details. This will cause `select` to poll

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: "examples" (typo)

/// the futures in the order they appear from top to bottom. There are a few
/// reasons you may want this:
///
/// - The random number generation of `tokio::select!` has a non-zero CPU cost
/// - Your futures may interact in a way where known polling order is significant
///
/// But there is an important caveat to this mode. It becomes your responsibility
/// to ensure that the polling order of your futures is fair. If for example you
/// are selecting between a stream and a shutdown future, and the stream has a
/// huge volume of messages and zero or nearly zero time between them, you should
/// place the shutdown future earlier in the `select!` list to ensure that it is
/// always polled, and will not be ignored due to the stream being constantly
/// ready.
///
/// # Panics
Expand Down Expand Up @@ -283,6 +299,45 @@
/// assert_eq!(res.1, "second");
/// }
/// ```
///
/// Using the `biased;` mode to control polling order.
///
/// ```
/// #[tokio::main]
/// async fn main() {
/// let mut count = 0u8;
///
/// loop {
/// tokio::select! {
/// // If you run this example without `biased;`, the polling order is
/// // psuedo-random, and the assertions on the value of count will
/// // (probably) fail.
/// biased;
///
/// _ = async {}, if count < 1 => {
/// count += 1;
/// assert_eq!(count, 1);
/// }
/// _ = async {}, if count < 2 => {
/// count += 1;
/// assert_eq!(count, 2);
/// }
/// _ = async {}, if count < 3 => {
/// count += 1;
/// assert_eq!(count, 3);
/// }
/// _ = async {}, if count < 4 => {
/// count += 1;
/// assert_eq!(count, 4);
/// }
///
/// else => {
/// break;
/// }
/// };
/// }
/// }
/// ```
#[macro_export]
#[cfg_attr(docsrs, doc(cfg(feature = "macros")))]
macro_rules! select {
Expand All @@ -300,6 +355,10 @@ macro_rules! select {

// All input is normalized, now transform.
(@ {
// The index of the future to poll first (in bias mode), or the RNG
// expression to use to pick a future to poll first.
start=$start:expr;

// One `_` for each branch in the `select!` macro. Passing this to
// `count!` converts $skip to an integer.
( $($count:tt)* )
Expand Down Expand Up @@ -357,9 +416,11 @@ macro_rules! select {
// disabled.
let mut is_pending = false;

// Randomly generate a starting point. This makes `select!` a
// bit more fair and avoids always polling the first future.
let start = $crate::macros::support::thread_rng_n(BRANCHES);
// Choose a starting index to begin polling the futures at. In
// practice, this will either be a psuedo-randomly generrated
// number by default, or the constant 0 if `biased;` is
// supplied.
let start = $start;

for i in 0..BRANCHES {
let branch;
Expand Down Expand Up @@ -444,42 +505,48 @@ macro_rules! select {
// These rules match a single `select!` branch and normalize it for
// processing by the first rule.

(@ { $($t:tt)* } ) => {
(@ { start=$start:expr; $($t:tt)* } ) => {
// No `else` branch
$crate::select!(@{ $($t)*; panic!("all branches are disabled and there is no else branch") })
$crate::select!(@{ start=$start; $($t)*; panic!("all branches are disabled and there is no else branch") })
};
(@ { $($t:tt)* } else => $else:expr $(,)?) => {
$crate::select!(@{ $($t)*; $else })
(@ { start=$start:expr; $($t:tt)* } else => $else:expr $(,)?) => {
$crate::select!(@{ start=$start; $($t)*; $else })
};
(@ { ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr, if $c:expr => $h:block, $($r:tt)* ) => {
$crate::select!(@{ ($($s)* _) $($t)* ($($s)*) $p = $f, if $c => $h, } $($r)*)
(@ { start=$start:expr; ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr, if $c:expr => $h:block, $($r:tt)* ) => {
$crate::select!(@{ start=$start; ($($s)* _) $($t)* ($($s)*) $p = $f, if $c => $h, } $($r)*)
};
(@ { ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr => $h:block, $($r:tt)* ) => {
$crate::select!(@{ ($($s)* _) $($t)* ($($s)*) $p = $f, if true => $h, } $($r)*)
(@ { start=$start:expr; ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr => $h:block, $($r:tt)* ) => {
$crate::select!(@{ start=$start; ($($s)* _) $($t)* ($($s)*) $p = $f, if true => $h, } $($r)*)
};
(@ { ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr, if $c:expr => $h:block $($r:tt)* ) => {
$crate::select!(@{ ($($s)* _) $($t)* ($($s)*) $p = $f, if $c => $h, } $($r)*)
(@ { start=$start:expr; ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr, if $c:expr => $h:block $($r:tt)* ) => {
$crate::select!(@{ start=$start; ($($s)* _) $($t)* ($($s)*) $p = $f, if $c => $h, } $($r)*)
};
(@ { ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr => $h:block $($r:tt)* ) => {
$crate::select!(@{ ($($s)* _) $($t)* ($($s)*) $p = $f, if true => $h, } $($r)*)
(@ { start=$start:expr; ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr => $h:block $($r:tt)* ) => {
$crate::select!(@{ start=$start; ($($s)* _) $($t)* ($($s)*) $p = $f, if true => $h, } $($r)*)
};
(@ { ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr, if $c:expr => $h:expr ) => {
$crate::select!(@{ ($($s)* _) $($t)* ($($s)*) $p = $f, if $c => $h, })
(@ { start=$start:expr; ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr, if $c:expr => $h:expr ) => {
$crate::select!(@{ start=$start; ($($s)* _) $($t)* ($($s)*) $p = $f, if $c => $h, })
};
(@ { ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr => $h:expr ) => {
$crate::select!(@{ ($($s)* _) $($t)* ($($s)*) $p = $f, if true => $h, })
(@ { start=$start:expr; ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr => $h:expr ) => {
$crate::select!(@{ start=$start; ($($s)* _) $($t)* ($($s)*) $p = $f, if true => $h, })
};
(@ { ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr, if $c:expr => $h:expr, $($r:tt)* ) => {
$crate::select!(@{ ($($s)* _) $($t)* ($($s)*) $p = $f, if $c => $h, } $($r)*)
(@ { start=$start:expr; ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr, if $c:expr => $h:expr, $($r:tt)* ) => {
$crate::select!(@{ start=$start; ($($s)* _) $($t)* ($($s)*) $p = $f, if $c => $h, } $($r)*)
};
(@ { ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr => $h:expr, $($r:tt)* ) => {
$crate::select!(@{ ($($s)* _) $($t)* ($($s)*) $p = $f, if true => $h, } $($r)*)
(@ { start=$start:expr; ( $($s:tt)* ) $($t:tt)* } $p:pat = $f:expr => $h:expr, $($r:tt)* ) => {
$crate::select!(@{ start=$start; ($($s)* _) $($t)* ($($s)*) $p = $f, if true => $h, } $($r)*)
};

// ===== Entry point =====

(biased; $p:pat = $($t:tt)* ) => {
$crate::select!(@{ start=0; () } $p = $($t)*)
};

( $p:pat = $($t:tt)* ) => {
$crate::select!(@{ () } $p = $($t)*)
// Randomly generate a starting point. This makes `select!` a bit more
// fair and avoids always polling the first future.
$crate::select!(@{ start={ $crate::macros::support::thread_rng_n(BRANCHES) }; () } $p = $($t)*)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh, it surprises me that we can access BRANCHES here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It surprised me too, but I was very happy that my intuition that we couldn't was wrong 😄

};
() => {
compile_error!("select! requires at least one branch.")
Expand Down
76 changes: 76 additions & 0 deletions tokio/tests/macros_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,3 +481,79 @@ async fn mut_on_left_hand_side() {
.await;
assert_eq!(v, 2);
}

#[tokio::test]
async fn biased_one_not_ready() {
let (_tx1, rx1) = oneshot::channel::<i32>();
let (tx2, rx2) = oneshot::channel::<i32>();
let (tx3, rx3) = oneshot::channel::<i32>();

tx2.send(2).unwrap();
tx3.send(3).unwrap();

let v = tokio::select! {
biased;

_ = rx1 => unreachable!(),
res = rx2 => {
assert_ok!(res)
},
_ = rx3 => {
panic!("This branch should never be activated because `rx2` should be polled before `rx3` due to `biased;`.")
}
};

assert_eq!(2, v);
}

#[tokio::test]
async fn biased_eventually_ready() {
use std::future::Future;

fn ready_on_2nd_poll() -> impl Future<Output = ()> + Unpin {
use std::task::Poll;

let mut polled = false;

poll_fn(move |ctx| {
if polled {
Poll::Ready(())
} else {
polled = true;
ctx.waker().wake_by_ref();

Poll::Pending
}
})
}

let one = async {};
let two = ready_on_2nd_poll();
let three = ready_on_2nd_poll();
Darksonn marked this conversation as resolved.
Show resolved Hide resolved

let mut count = 0u8;

tokio::pin!(one, two, three);

loop {
tokio::select! {
biased;

_ = &mut two, if count < 2 => {
count += 1;
assert_eq!(count, 2);
}
_ = &mut three, if count < 3 => {
count += 1;
assert_eq!(count, 3);
}
_ = &mut one, if count < 1 => {
count += 1;
assert_eq!(count, 1);
}
else => break,
}
}

assert_eq!(count, 3);
}