Skip to content

Commit ef6562c

Browse files
committed
Add usage example
this is exactly the quinn benchmark from https://github.com/quinn-rs/quinn/tree/main/bench, but adapted to use quinn-noise.
1 parent cf65b48 commit ef6562c

File tree

6 files changed

+547
-1
lines changed

6 files changed

+547
-1
lines changed

Diff for: .gitignore

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
/target
1+
/**/target
22
Cargo.lock

Diff for: Cargo.toml

+3
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,6 @@ tracing = "0.1.26"
2121
x25519-dalek = "1.1.1"
2222
xoodoo = "0.1.0"
2323
zeroize = "1.3.0"
24+
25+
[workspace]
26+
members = ["examples/bench"]

Diff for: examples/bench/Cargo.toml

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
[package]
2+
name = "bench"
3+
version = "0.1.0"
4+
edition = "2018"
5+
license = "MIT OR Apache-2.0"
6+
publish = false
7+
8+
[dependencies]
9+
anyhow = "1.0.22"
10+
bytes = "1"
11+
hdrhistogram = { version = "7.2", default-features = false }
12+
quinn = "0.9.1"
13+
clap = { version = "3.2", features = ["derive"] }
14+
tokio = { version = "1.0.1", features = ["rt", "sync"] }
15+
tracing = "0.1.10"
16+
tracing-subscriber = { version = "0.3.0", default-features = false, features = ["env-filter", "fmt", "ansi", "time", "local-time"] }
17+
quinn-noise = { path = "../.." }
18+
rand = "0.7.3"
19+
ed25519-dalek = "1.0.1"

Diff for: examples/bench/src/bin/bulk.rs

+212
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
use std::{
2+
net::SocketAddr,
3+
sync::{Arc, Mutex},
4+
time::Instant,
5+
};
6+
7+
use anyhow::{Context, Result};
8+
use clap::Parser;
9+
use tokio::sync::Semaphore;
10+
use tracing::{info, trace};
11+
12+
use bench::{
13+
configure_tracing_subscriber, connect_client, drain_stream, rt, send_data_on_stream,
14+
server_endpoint,
15+
stats::{Stats, TransferResult},
16+
Opt,
17+
};
18+
19+
fn main() {
20+
let opt = Opt::parse();
21+
configure_tracing_subscriber();
22+
23+
let mut csprng = rand::rngs::OsRng {};
24+
let keypair: ed25519_dalek::Keypair = ed25519_dalek::Keypair::generate(&mut csprng);
25+
let public_key: ed25519_dalek::PublicKey = keypair.public;
26+
27+
let runtime = rt();
28+
let (server_addr, endpoint) = server_endpoint(&runtime, keypair, &opt);
29+
30+
let server_thread = std::thread::spawn(move || {
31+
if let Err(e) = runtime.block_on(server(endpoint, opt)) {
32+
eprintln!("server failed: {:#}", e);
33+
}
34+
});
35+
36+
let mut handles = Vec::new();
37+
for _ in 0..opt.clients {
38+
handles.push(std::thread::spawn(move || {
39+
let runtime = rt();
40+
match runtime.block_on(client(server_addr, public_key, opt)) {
41+
Ok(stats) => Ok(stats),
42+
Err(e) => {
43+
eprintln!("client failed: {:#}", e);
44+
Err(e)
45+
}
46+
}
47+
}));
48+
}
49+
50+
for (id, handle) in handles.into_iter().enumerate() {
51+
// We print all stats at the end of the test sequentially to avoid
52+
// them being garbled due to being printed concurrently
53+
if let Ok(stats) = handle.join().expect("client thread") {
54+
stats.print(id);
55+
}
56+
}
57+
58+
server_thread.join().expect("server thread");
59+
}
60+
61+
async fn server(endpoint: quinn::Endpoint, opt: Opt) -> Result<()> {
62+
let mut server_tasks = Vec::new();
63+
64+
// Handle only the expected amount of clients
65+
for _ in 0..opt.clients {
66+
let handshake = endpoint.accept().await.unwrap();
67+
let connection = handshake.await.context("handshake failed")?;
68+
69+
server_tasks.push(tokio::spawn(async move {
70+
loop {
71+
let (mut send_stream, mut recv_stream) = match connection.accept_bi().await {
72+
Err(quinn::ConnectionError::ApplicationClosed(_)) => break,
73+
Err(e) => {
74+
eprintln!("accepting stream failed: {:?}", e);
75+
break;
76+
}
77+
Ok(stream) => stream,
78+
};
79+
trace!("stream established");
80+
81+
let _: tokio::task::JoinHandle<Result<()>> = tokio::spawn(async move {
82+
drain_stream(&mut recv_stream, opt.read_unordered).await?;
83+
send_data_on_stream(&mut send_stream, opt.download_size).await?;
84+
Ok(())
85+
});
86+
}
87+
88+
if opt.stats {
89+
println!("\nServer connection stats:\n{:#?}", connection.stats());
90+
}
91+
}));
92+
}
93+
94+
// Await all the tasks. We have to do this to prevent the runtime getting dropped
95+
// and all server tasks to be cancelled
96+
for handle in server_tasks {
97+
if let Err(e) = handle.await {
98+
eprintln!("Server task error: {:?}", e);
99+
};
100+
}
101+
102+
Ok(())
103+
}
104+
105+
async fn client(
106+
server_addr: SocketAddr,
107+
remote_public_key: ed25519_dalek::PublicKey,
108+
opt: Opt,
109+
) -> Result<ClientStats> {
110+
let (endpoint, connection) = connect_client(server_addr, remote_public_key, opt).await?;
111+
112+
let start = Instant::now();
113+
114+
let connection = Arc::new(connection);
115+
116+
let mut stats = ClientStats::default();
117+
let mut first_error = None;
118+
119+
let sem = Arc::new(Semaphore::new(opt.max_streams));
120+
let results = Arc::new(Mutex::new(Vec::new()));
121+
for _ in 0..opt.streams {
122+
let permit = sem.clone().acquire_owned().await.unwrap();
123+
let results = results.clone();
124+
let connection = connection.clone();
125+
tokio::spawn(async move {
126+
let result =
127+
handle_client_stream(connection, opt.upload_size, opt.read_unordered).await;
128+
info!("stream finished: {:?}", result);
129+
results.lock().unwrap().push(result);
130+
drop(permit);
131+
});
132+
}
133+
134+
// Wait for remaining streams to finish
135+
let _ = sem.acquire_many(opt.max_streams as u32).await.unwrap();
136+
137+
for result in results.lock().unwrap().drain(..) {
138+
match result {
139+
Ok((upload_result, download_result)) => {
140+
stats.upload_stats.stream_finished(upload_result);
141+
stats.download_stats.stream_finished(download_result);
142+
}
143+
Err(e) => {
144+
if first_error.is_none() {
145+
first_error = Some(e);
146+
}
147+
}
148+
}
149+
}
150+
151+
stats.upload_stats.total_duration = start.elapsed();
152+
stats.download_stats.total_duration = start.elapsed();
153+
154+
// Explicit close of the connection, since handles can still be around due
155+
// to `Arc`ing them
156+
connection.close(0u32.into(), b"Benchmark done");
157+
158+
endpoint.wait_idle().await;
159+
160+
if opt.stats {
161+
println!("\nClient connection stats:\n{:#?}", connection.stats());
162+
}
163+
164+
match first_error {
165+
None => Ok(stats),
166+
Some(e) => Err(e),
167+
}
168+
}
169+
170+
async fn handle_client_stream(
171+
connection: Arc<quinn::Connection>,
172+
upload_size: u64,
173+
read_unordered: bool,
174+
) -> Result<(TransferResult, TransferResult)> {
175+
let start = Instant::now();
176+
177+
let (mut send_stream, mut recv_stream) = connection
178+
.open_bi()
179+
.await
180+
.context("failed to open stream")?;
181+
182+
send_data_on_stream(&mut send_stream, upload_size).await?;
183+
184+
let upload_result = TransferResult::new(start.elapsed(), upload_size);
185+
186+
let start = Instant::now();
187+
let size = drain_stream(&mut recv_stream, read_unordered).await?;
188+
let download_result = TransferResult::new(start.elapsed(), size as u64);
189+
190+
Ok((upload_result, download_result))
191+
}
192+
193+
#[derive(Default)]
194+
struct ClientStats {
195+
upload_stats: Stats,
196+
download_stats: Stats,
197+
}
198+
199+
impl ClientStats {
200+
pub fn print(&self, client_id: usize) {
201+
println!();
202+
println!("Client {} stats:", client_id);
203+
204+
if self.upload_stats.total_size != 0 {
205+
self.upload_stats.print("upload");
206+
}
207+
208+
if self.download_stats.total_size != 0 {
209+
self.download_stats.print("download");
210+
}
211+
}
212+
}

0 commit comments

Comments
 (0)