Skip to content

Commit 5824a90

Browse files
committed
Add fairness to futures_util::{select, try_select, select_ok}, fixes rust-lang#2135
1 parent 67462f7 commit 5824a90

File tree

8 files changed

+161
-32
lines changed

8 files changed

+161
-32
lines changed

futures-util/Cargo.toml

+4-2
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@ Common utilities and extension traits for the futures-rs library.
1111
"""
1212

1313
[features]
14-
default = ["std", "async-await", "async-await-macro"]
15-
std = ["alloc", "futures-core/std", "futures-task/std", "slab"]
14+
default = ["std", "getrandom", "async-await", "async-await-macro"]
15+
std = ["alloc", "futures-core/std", "futures-task/std", "rand/std", "rand/std_rng", "slab"]
16+
getrandom = ["rand/getrandom"]
1617
alloc = ["futures-core/alloc", "futures-task/alloc"]
1718
async-await = []
1819
async-await-macro = ["async-await", "futures-macro"]
@@ -42,6 +43,7 @@ futures_01 = { version = "0.1.25", optional = true, package = "futures" }
4243
tokio-io = { version = "0.1.9", optional = true }
4344
pin-utils = "0.1.0"
4445
pin-project-lite = "0.2.6"
46+
rand = { version = "0.8.5", default-features = false, features = ["small_rng"] }
4547

4648
[dev-dependencies]
4749
futures = { path = "../futures", features = ["async-await", "thread-pool"] }

futures-util/src/future/select.rs

+19-5
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,15 @@ use crate::future::{Either, FutureExt};
33
use core::pin::Pin;
44
use futures_core::future::{FusedFuture, Future};
55
use futures_core::task::{Context, Poll};
6+
use rand::rngs::SmallRng;
7+
use rand::Rng;
68

79
/// Future for the [`select()`] function.
810
#[must_use = "futures do nothing unless you `.await` or poll them"]
911
#[derive(Debug)]
1012
pub struct Select<A, B> {
1113
inner: Option<(A, B)>,
14+
rng: SmallRng,
1215
}
1316

1417
impl<A: Unpin, B: Unpin> Unpin for Select<A, B> {}
@@ -22,6 +25,9 @@ impl<A: Unpin, B: Unpin> Unpin for Select<A, B> {}
2225
/// Note that this function consumes the receiving futures and returns a
2326
/// wrapped version of them.
2427
///
28+
/// If both futures are ready when this is polled, the winner will be pseudo-randomly
29+
/// selected.
30+
///
2531
/// Also note that if both this and the second future have the same
2632
/// output type you can use the `Either::factor_first` method to
2733
/// conveniently extract out the value at the end.
@@ -88,6 +94,7 @@ where
8894
{
8995
assert_future::<Either<(A::Output, B), (B::Output, A)>, _>(Select {
9096
inner: Some((future1, future2)),
97+
rng: crate::gen_rng(),
9198
})
9299
}
93100

@@ -101,14 +108,21 @@ where
101108
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
102109
let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice");
103110

104-
if let Poll::Ready(val) = a.poll_unpin(cx) {
105-
return Poll::Ready(Either::Left((val, b)));
111+
macro_rules! poll_wrap {
112+
($to_poll:expr, $unpolled:expr, $wrap:expr) => {
113+
if let Poll::Ready(val) = $to_poll.poll_unpin(cx) {
114+
return Poll::Ready($wrap((val, $unpolled)));
115+
}
116+
};
106117
}
107118

108-
if let Poll::Ready(val) = b.poll_unpin(cx) {
109-
return Poll::Ready(Either::Right((val, a)));
119+
if self.rng.gen::<bool>() {
120+
poll_wrap!(a, b, Either::Left);
121+
poll_wrap!(b, a, Either::Right);
122+
} else {
123+
poll_wrap!(b, a, Either::Right);
124+
poll_wrap!(a, b, Either::Left);
110125
}
111-
112126
self.inner = Some((a, b));
113127
Poll::Pending
114128
}

futures-util/src/future/select_ok.rs

+23-8
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,15 @@ use core::mem;
66
use core::pin::Pin;
77
use futures_core::future::{Future, TryFuture};
88
use futures_core::task::{Context, Poll};
9+
use rand::prelude::SliceRandom;
10+
use rand::rngs::SmallRng;
911

1012
/// Future for the [`select_ok`] function.
1113
#[derive(Debug)]
1214
#[must_use = "futures do nothing unless you `.await` or poll them"]
1315
pub struct SelectOk<Fut> {
1416
inner: Vec<Fut>,
17+
rng: SmallRng,
1518
}
1619

1720
impl<Fut: Unpin> Unpin for SelectOk<Fut> {}
@@ -26,6 +29,17 @@ impl<Fut: Unpin> Unpin for SelectOk<Fut> {}
2629
/// This function is only available when the `std` or `alloc` feature of this
2730
/// library is activated, and it is activated by default.
2831
///
32+
/// # Note for users migrating from 0.3 to 0.4
33+
///
34+
/// This function used to be biased in favor of futures that appeared earlier in the
35+
/// iterator. This is no longer the case, the futures are now shuffled prior to being polled.
36+
/// This prevents starvation problems. It also has the side effect that the returned `Vec`
37+
/// of remaining futures may be longer than it was in version 0.3, because of this shuffling.
38+
/// Some futures that would have been polled and had errors get dropped, may now instead
39+
/// remain in the collection without being polled.
40+
///
41+
/// If you were relying on this biased behavior, consider switching to the [`select_biased!`](crate::select_biased) macro.
42+
///
2943
/// # Panics
3044
///
3145
/// This function will panic if the iterator specified contains no items.
@@ -34,7 +48,7 @@ where
3448
I: IntoIterator,
3549
I::Item: TryFuture + Unpin,
3650
{
37-
let ret = SelectOk { inner: iter.into_iter().collect() };
51+
let ret = SelectOk { inner: iter.into_iter().collect(), rng: crate::gen_rng() };
3852
assert!(!ret.inner.is_empty(), "iterator provided to select_ok was empty");
3953
assert_future::<
4054
Result<(<I::Item as TryFuture>::Ok, Vec<I::Item>), <I::Item as TryFuture>::Error>,
@@ -46,24 +60,25 @@ impl<Fut: TryFuture + Unpin> Future for SelectOk<Fut> {
4660
type Output = Result<(Fut::Ok, Vec<Fut>), Fut::Error>;
4761

4862
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
63+
let Self { inner, rng } = &mut *self;
64+
inner.shuffle(rng);
4965
// loop until we've either exhausted all errors, a success was hit, or nothing is ready
5066
loop {
51-
let item =
52-
self.inner.iter_mut().enumerate().find_map(|(i, f)| match f.try_poll_unpin(cx) {
53-
Poll::Pending => None,
54-
Poll::Ready(e) => Some((i, e)),
55-
});
67+
let item = inner.iter_mut().enumerate().find_map(|(i, f)| match f.try_poll_unpin(cx) {
68+
Poll::Pending => None,
69+
Poll::Ready(e) => Some((i, e)),
70+
});
5671
match item {
5772
Some((idx, res)) => {
5873
// always remove Ok or Err, if it's not the last Err continue looping
59-
drop(self.inner.remove(idx));
74+
drop(inner.remove(idx));
6075
match res {
6176
Ok(e) => {
6277
let rest = mem::take(&mut self.inner);
6378
return Poll::Ready(Ok((e, rest)));
6479
}
6580
Err(e) => {
66-
if self.inner.is_empty() {
81+
if inner.is_empty() {
6782
return Poll::Ready(Err(e));
6883
}
6984
}

futures-util/src/future/try_select.rs

+23-10
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@ use crate::future::{Either, TryFutureExt};
22
use core::pin::Pin;
33
use futures_core::future::{Future, TryFuture};
44
use futures_core::task::{Context, Poll};
5+
use rand::rngs::SmallRng;
6+
use rand::Rng;
57

68
/// Future for the [`try_select()`] function.
79
#[must_use = "futures do nothing unless you `.await` or poll them"]
810
#[derive(Debug)]
911
pub struct TrySelect<A, B> {
1012
inner: Option<(A, B)>,
13+
rng: SmallRng,
1114
}
1215

1316
impl<A: Unpin, B: Unpin> Unpin for TrySelect<A, B> {}
@@ -57,6 +60,7 @@ where
5760
{
5861
super::assert_future::<Result<EitherOk<A, B>, EitherErr<A, B>>, _>(TrySelect {
5962
inner: Some((future1, future2)),
63+
rng: crate::gen_rng(),
6064
})
6165
}
6266

@@ -69,17 +73,26 @@ where
6973

7074
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
7175
let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice");
72-
match a.try_poll_unpin(cx) {
73-
Poll::Ready(Err(x)) => Poll::Ready(Err(Either::Left((x, b)))),
74-
Poll::Ready(Ok(x)) => Poll::Ready(Ok(Either::Left((x, b)))),
75-
Poll::Pending => match b.try_poll_unpin(cx) {
76-
Poll::Ready(Err(x)) => Poll::Ready(Err(Either::Right((x, a)))),
77-
Poll::Ready(Ok(x)) => Poll::Ready(Ok(Either::Right((x, a)))),
78-
Poll::Pending => {
79-
self.inner = Some((a, b));
80-
Poll::Pending
76+
macro_rules! poll_wrap {
77+
($poll_first:expr, $poll_second:expr, $wrap_first:expr, $wrap_second:expr) => {
78+
match $poll_first.try_poll_unpin(cx) {
79+
Poll::Ready(Err(x)) => Poll::Ready(Err($wrap_first((x, $poll_second)))),
80+
Poll::Ready(Ok(x)) => Poll::Ready(Ok($wrap_first((x, $poll_second)))),
81+
Poll::Pending => match $poll_second.try_poll_unpin(cx) {
82+
Poll::Ready(Err(x)) => Poll::Ready(Err($wrap_second((x, $poll_first)))),
83+
Poll::Ready(Ok(x)) => Poll::Ready(Ok($wrap_second((x, $poll_first)))),
84+
Poll::Pending => {
85+
self.inner = Some((a, b));
86+
Poll::Pending
87+
}
88+
},
8189
}
82-
},
90+
};
91+
}
92+
if self.rng.gen::<bool>() {
93+
poll_wrap!(a, b, Either::Left, Either::Right)
94+
} else {
95+
poll_wrap!(b, a, Either::Right, Either::Left)
8396
}
8497
}
8598
}

futures-util/src/lib.rs

+14
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ extern crate alloc;
2929
// Macro re-exports
3030
pub use futures_core::ready;
3131
pub use pin_utils::pin_mut;
32+
use rand::rngs::SmallRng;
33+
use rand::SeedableRng;
3234

3335
#[cfg(feature = "async-await")]
3436
#[macro_use]
@@ -334,3 +336,15 @@ mod abortable;
334336

335337
mod fns;
336338
mod unfold_state;
339+
340+
fn gen_rng() -> SmallRng {
341+
#[cfg(feature = "std")] {
342+
SmallRng::from_rng(rand::thread_rng()).expect("generating SmallRng via thread_rng failed")
343+
}
344+
#[cfg(all(feature = "getrandom", not(feature = "std")))] {
345+
SmallRng::from_entropy()
346+
}
347+
#[cfg(not(any(feature = "getrandom", feature = "std")))] {
348+
SmallRng::seed_from_u64(0)
349+
}
350+
}

futures/tests/future_select.rs

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
use std::future::ready;
2+
3+
use futures::future::select;
4+
use futures_executor::block_on;
5+
6+
7+
8+
#[test]
9+
fn is_fair() {
10+
let mut results = Vec::with_capacity(100);
11+
for _ in 0..100 {
12+
let (i, _) = block_on(select(ready(0), ready(1))).factor_first();
13+
results.push(i);
14+
}
15+
const THRESHOLD: usize = 30;
16+
assert_eq!(results.iter().filter(|i| **i == 0).take(THRESHOLD).count(), THRESHOLD);
17+
assert_eq!(results.iter().filter(|i| **i == 1).take(THRESHOLD).count(), THRESHOLD)
18+
}

futures/tests/future_select_ok.rs

+44-7
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,67 @@
1+
use std::fmt::Debug;
2+
use std::time::Duration;
3+
14
use futures::executor::block_on;
2-
use futures::future::{err, ok, select_ok};
5+
use futures::future::{err, ok, select_ok, Future};
6+
use futures_channel::oneshot;
7+
use std::thread;
38

49
#[test]
510
fn ignore_err() {
611
let v = vec![err(1), err(2), ok(3), ok(4)];
712

813
let (i, v) = block_on(select_ok(v)).ok().unwrap();
9-
assert_eq!(i, 3);
14+
assert!(i == 3 || i == 4);
1015

11-
assert_eq!(v.len(), 1);
16+
assert!(v.len() < 4);
1217

13-
let (i, v) = block_on(select_ok(v)).ok().unwrap();
14-
assert_eq!(i, 4);
18+
let (j, v) = block_on(select_ok(v)).ok().unwrap();
19+
assert!(j == 3 || j == 4);
20+
assert_ne!(j, i);
1521

16-
assert!(v.is_empty());
22+
assert!(v.len() < 3);
1723
}
1824

1925
#[test]
2026
fn last_err() {
21-
let v = vec![ok(1), err(2), err(3)];
27+
let (ok_sender, ok_receiver) = oneshot::channel();
28+
let (first_err_sender, first_err_receiver) = oneshot::channel();
29+
let (second_err_sender, second_err_receiver) = oneshot::channel();
30+
async fn await_unwrap<T, E: Debug>(o: impl Future<Output = Result<T, E>>) -> T {
31+
o.await.unwrap()
32+
}
33+
let v = vec![
34+
Box::pin(await_unwrap(ok_receiver)),
35+
Box::pin(await_unwrap(first_err_receiver)),
36+
Box::pin(await_unwrap(second_err_receiver)),
37+
];
38+
ok_sender.send(Ok(1)).unwrap();
2239

2340
let (i, v) = block_on(select_ok(v)).ok().unwrap();
2441
assert_eq!(i, 1);
2542

2643
assert_eq!(v.len(), 2);
44+
first_err_sender.send(Err(2)).unwrap();
45+
46+
thread::spawn(move || {
47+
thread::sleep(Duration::from_millis(100));
48+
second_err_sender.send(Err(3)).unwrap();
49+
});
2750

2851
let i = block_on(select_ok(v)).err().unwrap();
2952
assert_eq!(i, 3);
3053
}
54+
55+
#[test]
56+
fn is_fair() {
57+
let mut results = Vec::with_capacity(100);
58+
for _ in 0..100 {
59+
let v = vec![err(1), err(2), ok(3), ok(4)];
60+
61+
let (i, _v) = block_on(select_ok(v)).ok().unwrap();
62+
results.push(i);
63+
}
64+
const THRESHOLD: usize = 30;
65+
assert_eq!(results.iter().filter(|i| **i == 3).take(THRESHOLD).count(), THRESHOLD);
66+
assert_eq!(results.iter().filter(|i| **i == 4).take(THRESHOLD).count(), THRESHOLD);
67+
}

futures/tests/future_try_select.rs

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
use futures::future::{ok, try_select};
2+
use futures_executor::block_on;
3+
4+
5+
6+
#[test]
7+
fn is_fair() {
8+
let mut results = Vec::with_capacity(100);
9+
for _ in 0..100 {
10+
let (i, _) = block_on(try_select(ok::<_, ()>(0), ok::<_, ()>(1))).unwrap().factor_first();
11+
results.push(i);
12+
}
13+
const THRESHOLD: usize = 30;
14+
assert_eq!(results.iter().filter(|i| **i == 0).take(THRESHOLD).count(), THRESHOLD);
15+
assert_eq!(results.iter().filter(|i| **i == 1).take(THRESHOLD).count(), THRESHOLD)
16+
}

0 commit comments

Comments
 (0)