Skip to content

Commit 93d8638

Browse files
refactor: move more tests to poll-API based interface (#158)
1 parent fe95d02 commit 93d8638

File tree

3 files changed

+156
-127
lines changed

3 files changed

+156
-127
lines changed

test-harness/src/lib.rs

+137-2
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
1+
use futures::future::BoxFuture;
2+
use futures::stream::FuturesUnordered;
13
use futures::{
24
future, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, StreamExt, TryStreamExt,
35
};
4-
use futures::{stream, Stream};
6+
use futures::{stream, FutureExt, Stream};
57
use quickcheck::{Arbitrary, Gen};
6-
use std::io;
8+
use std::future::Future;
79
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
10+
use std::pin::Pin;
11+
use std::task::{Context, Poll};
12+
use std::{fmt, io, mem};
813
use tokio::net::{TcpListener, TcpStream};
914
use tokio_util::compat::{Compat, TokioAsyncReadCompatExt};
1015
use yamux::ConnectionError;
@@ -91,6 +96,136 @@ pub async fn send_recv_message(stream: &mut yamux::Stream, Msg(msg): Msg) -> io:
9196
Ok(())
9297
}
9398

99+
pub struct EchoServer<T> {
100+
connection: Connection<T>,
101+
worker_streams: FuturesUnordered<BoxFuture<'static, yamux::Result<()>>>,
102+
streams_processed: usize,
103+
connection_closed: bool,
104+
}
105+
106+
impl<T> EchoServer<T> {
107+
pub fn new(connection: Connection<T>) -> Self {
108+
Self {
109+
connection,
110+
worker_streams: FuturesUnordered::default(),
111+
streams_processed: 0,
112+
connection_closed: false,
113+
}
114+
}
115+
}
116+
117+
impl<T> Future for EchoServer<T>
118+
where
119+
T: AsyncRead + AsyncWrite + Unpin,
120+
{
121+
type Output = yamux::Result<usize>;
122+
123+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
124+
let this = self.get_mut();
125+
126+
loop {
127+
match this.worker_streams.poll_next_unpin(cx) {
128+
Poll::Ready(Some(Ok(()))) => {
129+
this.streams_processed += 1;
130+
continue;
131+
}
132+
Poll::Ready(Some(Err(e))) => {
133+
eprintln!("A stream failed: {}", e);
134+
continue;
135+
}
136+
Poll::Ready(None) => {
137+
if this.connection_closed {
138+
return Poll::Ready(Ok(this.streams_processed));
139+
}
140+
}
141+
Poll::Pending => {}
142+
}
143+
144+
match this.connection.poll_next_inbound(cx) {
145+
Poll::Ready(Some(Ok(mut stream))) => {
146+
this.worker_streams.push(
147+
async move {
148+
{
149+
let (mut r, mut w) = AsyncReadExt::split(&mut stream);
150+
futures::io::copy(&mut r, &mut w).await?;
151+
}
152+
stream.close().await?;
153+
Ok(())
154+
}
155+
.boxed(),
156+
);
157+
continue;
158+
}
159+
Poll::Ready(None) | Poll::Ready(Some(Err(_))) => {
160+
this.connection_closed = true;
161+
continue;
162+
}
163+
Poll::Pending => {}
164+
}
165+
166+
return Poll::Pending;
167+
}
168+
}
169+
}
170+
171+
#[derive(Debug)]
172+
pub struct OpenStreamsClient<T> {
173+
connection: Option<Connection<T>>,
174+
streams: Vec<yamux::Stream>,
175+
to_open: usize,
176+
}
177+
178+
impl<T> OpenStreamsClient<T> {
179+
pub fn new(connection: Connection<T>, to_open: usize) -> Self {
180+
Self {
181+
connection: Some(connection),
182+
streams: vec![],
183+
to_open,
184+
}
185+
}
186+
}
187+
188+
impl<T> Future for OpenStreamsClient<T>
189+
where
190+
T: AsyncRead + AsyncWrite + Unpin + fmt::Debug,
191+
{
192+
type Output = yamux::Result<(Connection<T>, Vec<yamux::Stream>)>;
193+
194+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
195+
let this = self.get_mut();
196+
let connection = this.connection.as_mut().unwrap();
197+
198+
loop {
199+
// Drive connection to make progress.
200+
match connection.poll_next_inbound(cx)? {
201+
Poll::Ready(_stream) => {
202+
panic!("Unexpected inbound stream");
203+
}
204+
Poll::Pending => {}
205+
}
206+
207+
if this.streams.len() < this.to_open {
208+
match connection.poll_new_outbound(cx)? {
209+
Poll::Ready(stream) => {
210+
this.streams.push(stream);
211+
continue;
212+
}
213+
Poll::Pending => {}
214+
}
215+
}
216+
217+
if this.streams.len() == this.to_open {
218+
return Poll::Ready(Ok((
219+
this.connection.take().unwrap(),
220+
mem::take(&mut this.streams),
221+
)));
222+
}
223+
224+
return Poll::Pending;
225+
}
226+
}
227+
}
228+
94229
#[derive(Clone, Debug)]
95230
pub struct Msg(pub Vec<u8>);
96231

test-harness/tests/poll_api.rs

+19-66
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
use futures::future::BoxFuture;
22
use futures::stream::FuturesUnordered;
3-
use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, FutureExt, StreamExt};
3+
use futures::{AsyncRead, AsyncWrite, AsyncWriteExt, FutureExt, StreamExt};
44
use quickcheck::QuickCheck;
55
use std::future::Future;
66
use std::pin::Pin;
77
use std::task::{Context, Poll};
88
use test_harness::*;
99
use tokio::net::TcpStream;
1010
use tokio::runtime::Runtime;
11+
use tokio::task;
1112
use tokio_util::compat::TokioAsyncReadCompatExt;
12-
use yamux::{Connection, Mode};
13+
use yamux::{Config, Connection, ConnectionError, Mode};
1314

1415
#[test]
1516
fn prop_config_send_recv_multi() {
@@ -46,76 +47,28 @@ fn prop_config_send_recv_multi() {
4647
QuickCheck::new().quickcheck(prop as fn(_, _, _) -> _)
4748
}
4849

49-
struct EchoServer<T> {
50-
connection: Connection<T>,
51-
worker_streams: FuturesUnordered<BoxFuture<'static, yamux::Result<()>>>,
52-
streams_processed: usize,
53-
connection_closed: bool,
54-
}
50+
#[test]
51+
fn prop_max_streams() {
52+
fn prop(n: usize) -> Result<bool, ConnectionError> {
53+
let max_streams = n % 100;
54+
let mut cfg = Config::default();
55+
cfg.set_max_num_streams(max_streams);
5556

56-
impl<T> EchoServer<T> {
57-
fn new(connection: Connection<T>) -> Self {
58-
Self {
59-
connection,
60-
worker_streams: FuturesUnordered::default(),
61-
streams_processed: 0,
62-
connection_closed: false,
63-
}
64-
}
65-
}
57+
Runtime::new().unwrap().block_on(async move {
58+
let (server, client) = connected_peers(cfg.clone(), cfg).await?;
6659

67-
impl<T> Future for EchoServer<T>
68-
where
69-
T: AsyncRead + AsyncWrite + Unpin,
70-
{
71-
type Output = yamux::Result<usize>;
60+
task::spawn(EchoServer::new(server));
7261

73-
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
74-
let this = self.get_mut();
62+
let client = OpenStreamsClient::new(client, max_streams);
7563

76-
loop {
77-
match this.worker_streams.poll_next_unpin(cx) {
78-
Poll::Ready(Some(Ok(()))) => {
79-
this.streams_processed += 1;
80-
continue;
81-
}
82-
Poll::Ready(Some(Err(e))) => {
83-
eprintln!("A stream failed: {}", e);
84-
continue;
85-
}
86-
Poll::Ready(None) => {
87-
if this.connection_closed {
88-
return Poll::Ready(Ok(this.streams_processed));
89-
}
90-
}
91-
Poll::Pending => {}
92-
}
93-
94-
match this.connection.poll_next_inbound(cx) {
95-
Poll::Ready(Some(Ok(mut stream))) => {
96-
this.worker_streams.push(
97-
async move {
98-
{
99-
let (mut r, mut w) = AsyncReadExt::split(&mut stream);
100-
futures::io::copy(&mut r, &mut w).await?;
101-
}
102-
stream.close().await?;
103-
Ok(())
104-
}
105-
.boxed(),
106-
);
107-
continue;
108-
}
109-
Poll::Ready(None) | Poll::Ready(Some(Err(_))) => {
110-
this.connection_closed = true;
111-
continue;
112-
}
113-
Poll::Pending => {}
114-
}
64+
let (client, streams) = client.await?;
65+
assert_eq!(streams.len(), max_streams);
11566

116-
return Poll::Pending;
117-
}
67+
let open_result = OpenStreamsClient::new(client, 1).await;
68+
Ok(matches!(open_result, Err(ConnectionError::TooManyStreams)))
69+
})
11870
}
71+
QuickCheck::new().tests(7).quickcheck(prop as fn(_) -> _)
11972
}
12073

12174
struct MessageSender<T> {

test-harness/tests/tests.rs

-59
Original file line numberDiff line numberDiff line change
@@ -54,37 +54,6 @@ fn prop_config_send_recv_single() {
5454
.quickcheck(prop as fn(_, _, _) -> _)
5555
}
5656

57-
#[test]
58-
fn prop_config_send_recv_multi() {
59-
fn prop(
60-
mut msgs: Vec<Msg>,
61-
TestConfig(cfg1): TestConfig,
62-
TestConfig(cfg2): TestConfig,
63-
) -> Result<(), ConnectionError> {
64-
msgs.insert(0, Msg(vec![1u8; yamux::DEFAULT_CREDIT as usize]));
65-
66-
Runtime::new().unwrap().block_on(async move {
67-
let (server, client) = connected_peers(cfg1, cfg2).await?;
68-
69-
let server = echo_server(server);
70-
let client = async {
71-
let (control, client) = Control::new(client);
72-
task::spawn(noop_server(client));
73-
send_on_separate_streams(control, msgs).await?;
74-
75-
Ok(())
76-
};
77-
78-
futures::future::try_join(server, client).await?;
79-
80-
Ok(())
81-
})
82-
}
83-
QuickCheck::new()
84-
.tests(10)
85-
.quickcheck(prop as fn(_, _, _) -> _)
86-
}
87-
8857
#[test]
8958
fn prop_send_recv() {
9059
fn prop(msgs: Vec<Msg>) -> Result<TestResult, ConnectionError> {
@@ -112,34 +81,6 @@ fn prop_send_recv() {
11281
QuickCheck::new().tests(1).quickcheck(prop as fn(_) -> _)
11382
}
11483

115-
#[test]
116-
fn prop_max_streams() {
117-
fn prop(n: usize) -> Result<bool, ConnectionError> {
118-
let max_streams = n % 100;
119-
let mut cfg = Config::default();
120-
cfg.set_max_num_streams(max_streams);
121-
122-
Runtime::new().unwrap().block_on(async move {
123-
let (server, client) = connected_peers(cfg.clone(), cfg).await?;
124-
125-
task::spawn(echo_server(server));
126-
127-
let (mut control, client) = Control::new(client);
128-
task::spawn(noop_server(client));
129-
130-
let mut v = Vec::new();
131-
for _ in 0..max_streams {
132-
v.push(control.open_stream().await?)
133-
}
134-
135-
let open_result = control.open_stream().await;
136-
137-
Ok(matches!(open_result, Err(ConnectionError::TooManyStreams)))
138-
})
139-
}
140-
QuickCheck::new().tests(7).quickcheck(prop as fn(_) -> _)
141-
}
142-
14384
#[test]
14485
fn prop_send_recv_half_closed() {
14586
fn prop(msg: Msg) -> Result<(), ConnectionError> {

0 commit comments

Comments
 (0)