Skip to content

Commit 5aad19d

Browse files
authored
feat: add strategy config to fluvio benchmark (#4338)
1 parent 69161a3 commit 5aad19d

File tree

8 files changed

+336
-101
lines changed

8 files changed

+336
-101
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,44 @@
1+
use std::{fs::File, time::Duration};
2+
13
use anyhow::Result;
4+
use fluvio_future::timer::sleep;
25

3-
use crate::{cli::BenchmarkCmd, producer_benchmark::ProducerBenchmark};
6+
use crate::{cli::BenchmarkMode, config::config_matrix::Matrix, producer_benchmark::ProducerBenchmark};
47

58
pub struct BenchmarkDriver {}
69

710
impl BenchmarkDriver {
8-
pub async fn run_benchmark(cmd: BenchmarkCmd) -> Result<()> {
9-
match cmd {
10-
BenchmarkCmd::Producer(config) => {
11+
pub async fn run_benchmark(mode: BenchmarkMode) -> Result<()> {
12+
match mode {
13+
BenchmarkMode::Producer(config) => {
1114
ProducerBenchmark::run_benchmark(config).await?;
1215
}
13-
BenchmarkCmd::Consumer(_) => {
16+
BenchmarkMode::Consumer(_) => {
1417
println!("consume not implemented");
1518
}
16-
}
19+
BenchmarkMode::Matrix { config } => {
20+
let matrix_config = if let Some(path) = config {
21+
let file = File::open(&path).expect("file not found");
22+
serde_yaml::from_reader::<_, Matrix>(file).expect("deserialization failed")
23+
} else {
24+
crate::config::config_matrix::default_config()
25+
};
26+
let benchmarks_configs = matrix_config.generate_configs();
27+
for benchmark_config in benchmarks_configs {
28+
println!("Running benchmark: {:#?}", benchmark_config);
29+
match benchmark_config {
30+
crate::config::BenchmarkConfig::Producer(producer) => {
31+
ProducerBenchmark::run_benchmark(producer).await?;
32+
}
33+
crate::config::BenchmarkConfig::Consumer(_) => {
34+
println!("consume not implemented");
35+
}
36+
}
1737

38+
sleep(Duration::from_secs(1)).await;
39+
}
40+
}
41+
}
1842
Ok(())
1943
}
2044
}

crates/fluvio-benchmark/src/cli.rs

+17-5
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@ use clap::Parser;
22
use anyhow::Result;
33

44
use crate::{
5-
config::{ConsumerConfig, ProducerConfig},
65
benchmark_driver::BenchmarkDriver,
6+
config::{ConsumerConfig, ProducerConfig},
77
};
88

99
#[derive(Debug, Parser)]
1010
pub struct BenchmarkOpt {
1111
#[clap(subcommand)]
12-
benchmark: BenchmarkCmd,
12+
benchmark: Option<BenchmarkMode>,
1313
}
1414
impl BenchmarkOpt {
1515
pub async fn process(self) -> Result<()> {
@@ -19,16 +19,28 @@ impl BenchmarkOpt {
1919
}
2020

2121
#[derive(Debug, Parser)]
22-
pub enum BenchmarkCmd {
22+
pub enum BenchmarkMode {
23+
/// Use a matrix configuration file
24+
Matrix {
25+
/// Path to the configuration file
26+
#[arg(short, long)]
27+
config: Option<String>,
28+
},
2329
/// Run a producer benchmark
2430
Producer(ProducerConfig),
2531
/// Run a consumer benchmark
2632
Consumer(ConsumerConfig),
2733
}
2834

29-
pub async fn run_benchmarks(args: BenchmarkOpt) -> Result<()> {
35+
pub async fn run_benchmarks(opt: BenchmarkOpt) -> Result<()> {
3036
println!("# Fluvio Benchmark Results");
31-
BenchmarkDriver::run_benchmark(args.benchmark).await?;
37+
38+
if let Some(mode) = opt.benchmark {
39+
BenchmarkDriver::run_benchmark(mode).await?;
40+
} else {
41+
BenchmarkDriver::run_benchmark(BenchmarkMode::Matrix { config: None }).await?;
42+
}
43+
3244
println!();
3345
Ok(())
3446
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
use std::time::Duration;
2+
3+
use fluvio::Compression;
4+
use serde::{Deserialize, Serialize};
5+
use bytesize::ByteSize;
6+
7+
use crate::config::{BenchmarkConfig, RecordKeyAllocationStrategy};
8+
9+
use super::{cross::CrossIterate, default_topic_name, ProducerConfigBuilder};
10+
11+
#[derive(Debug, Clone, Serialize, Deserialize)]
12+
pub struct Matrix {
13+
pub producer_config: Option<ProducerMatrixConfig>,
14+
pub consumer_config: Option<ConsumerMatrixConfig>,
15+
pub shared_config: SharedMatrixConfig,
16+
}
17+
18+
#[derive(Debug, Clone, Serialize, Deserialize)]
19+
pub struct ProducerMatrixConfig {
20+
pub batch_size: Vec<ByteSize>,
21+
pub queue_size: Vec<u64>,
22+
pub max_request_size: Vec<ByteSize>,
23+
pub linger: Vec<Duration>,
24+
pub server_timeout: Vec<Duration>,
25+
pub compression: Vec<Compression>,
26+
}
27+
28+
#[derive(Debug, Clone, Serialize, Deserialize)]
29+
pub struct ConsumerMatrixConfig {}
30+
31+
#[derive(Debug, Serialize, Deserialize, Clone)]
32+
pub struct SharedMatrixConfig {
33+
pub num_samples: Vec<usize>,
34+
pub time_between_samples: Vec<Duration>,
35+
pub worker_timeout: Vec<Duration>,
36+
pub topic_config: FluvioTopicMatrixConfig,
37+
pub load_config: BenchmarkLoadMatrixConfig,
38+
}
39+
40+
#[derive(Debug, Serialize, Deserialize, Clone)]
41+
pub struct BenchmarkLoadMatrixConfig {
42+
pub record_key_allocation_strategy: Vec<RecordKeyAllocationStrategy>,
43+
pub num_producers: Vec<u64>,
44+
pub num_records: Vec<u64>,
45+
pub record_size: Vec<ByteSize>,
46+
}
47+
48+
#[derive(Debug, Serialize, Deserialize, Clone)]
49+
pub struct FluvioTopicMatrixConfig {
50+
pub partitions: Vec<u32>,
51+
pub replicas: Vec<u32>,
52+
pub topic_name: Vec<String>,
53+
pub delete_topic: Vec<bool>,
54+
pub ignore_rack: Vec<bool>,
55+
}
56+
57+
impl Matrix {
58+
pub fn generate_configs(&self) -> Vec<BenchmarkConfig> {
59+
let builder: Vec<ProducerConfigBuilder> = vec![ProducerConfigBuilder::default()];
60+
61+
if let Some(producer_config) = &self.producer_config {
62+
let producer_config = builder
63+
.cross_iterate(&producer_config.batch_size, |v, b| {
64+
b.batch_size(v);
65+
})
66+
.cross_iterate(&producer_config.queue_size, |v, b| {
67+
b.queue_size(v);
68+
})
69+
.cross_iterate(&producer_config.max_request_size, |v, b| {
70+
b.max_request_size(v);
71+
})
72+
.cross_iterate(&producer_config.linger, |v, b| {
73+
b.linger(v);
74+
})
75+
.cross_iterate(&producer_config.server_timeout, |v, b| {
76+
b.server_timeout(v);
77+
})
78+
.cross_iterate(&producer_config.compression, |v, b| {
79+
b.compression(v);
80+
})
81+
.cross_iterate(&self.shared_config.num_samples, |v, b| {
82+
b.num_samples(v);
83+
})
84+
.cross_iterate(&self.shared_config.time_between_samples, |v, b| {
85+
b.time_between_samples(v);
86+
})
87+
.cross_iterate(&self.shared_config.worker_timeout, |v, b| {
88+
b.worker_timeout(v);
89+
})
90+
.cross_iterate(&self.shared_config.topic_config.partitions, |v, b| {
91+
b.partitions(v);
92+
})
93+
.cross_iterate(&self.shared_config.topic_config.replicas, |v, b| {
94+
b.replicas(v);
95+
})
96+
.cross_iterate(&self.shared_config.topic_config.topic_name, |v, b| {
97+
b.topic_name(v);
98+
})
99+
.cross_iterate(&self.shared_config.topic_config.delete_topic, |v, b| {
100+
b.delete_topic(v);
101+
})
102+
.cross_iterate(&self.shared_config.topic_config.ignore_rack, |v, b| {
103+
b.ignore_rack(v);
104+
})
105+
.cross_iterate(
106+
&self
107+
.shared_config
108+
.load_config
109+
.record_key_allocation_strategy,
110+
|v, b| {
111+
b.record_key_allocation_strategy(v);
112+
},
113+
)
114+
.cross_iterate(&self.shared_config.load_config.num_producers, |v, b| {
115+
b.num_producers(v);
116+
})
117+
.cross_iterate(&self.shared_config.load_config.num_records, |v, b| {
118+
b.num_records(v);
119+
})
120+
.cross_iterate(&self.shared_config.load_config.record_size, |v, b| {
121+
b.record_size(v);
122+
})
123+
.build();
124+
125+
return producer_config
126+
.into_iter()
127+
.map(BenchmarkConfig::Producer)
128+
.collect();
129+
}
130+
131+
if let Some(_consumer) = &self.consumer_config {
132+
todo!("Consumer config not implemented");
133+
}
134+
135+
panic!("No producer or consumer config provided");
136+
}
137+
}
138+
139+
pub fn default_config() -> Matrix {
140+
Matrix {
141+
producer_config: Some(ProducerMatrixConfig {
142+
batch_size: vec![ByteSize::kib(16), ByteSize::mib(1)],
143+
queue_size: vec![100],
144+
max_request_size: vec![ByteSize::mb(32)],
145+
linger: vec![Duration::from_millis(0)],
146+
server_timeout: vec![Duration::from_secs(600)],
147+
compression: vec![Compression::None],
148+
}),
149+
consumer_config: None,
150+
shared_config: SharedMatrixConfig {
151+
num_samples: vec![2],
152+
time_between_samples: vec![Duration::from_secs(1)],
153+
worker_timeout: vec![Duration::from_secs(60)],
154+
topic_config: FluvioTopicMatrixConfig {
155+
partitions: vec![1],
156+
replicas: vec![1],
157+
topic_name: vec![default_topic_name()],
158+
delete_topic: vec![true],
159+
ignore_rack: vec![true],
160+
},
161+
load_config: BenchmarkLoadMatrixConfig {
162+
record_key_allocation_strategy: vec![RecordKeyAllocationStrategy::NoKey],
163+
num_producers: vec![1],
164+
num_records: vec![100, 10_000, 100_000],
165+
record_size: vec![ByteSize::kib(5)],
166+
},
167+
},
168+
}
169+
}
170+
171+
#[cfg(test)]
172+
mod tests {
173+
use super::*;
174+
175+
#[test]
176+
fn test_default_config() {
177+
let matrix = default_config();
178+
let configs = matrix.generate_configs();
179+
180+
assert_eq!(configs.len(), 6);
181+
}
182+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
use super::{ConsumerConfig, ConsumerConfigBuilder, ProducerConfig, ProducerConfigBuilder};
2+
3+
pub trait CrossIterate<C, B> {
4+
fn cross_iterate<T: Clone, F: Fn(T, &mut B) + Copy>(self, values: &[T], f: F) -> Self;
5+
fn build(self) -> Vec<C>;
6+
}
7+
8+
impl CrossIterate<ProducerConfig, ProducerConfigBuilder> for Vec<ProducerConfigBuilder> {
9+
fn cross_iterate<T: Clone, F: Fn(T, &mut ProducerConfigBuilder) + Copy>(
10+
self,
11+
values: &[T],
12+
f: F,
13+
) -> Self {
14+
self.into_iter()
15+
.flat_map(|builder| {
16+
values.iter().map(move |value| {
17+
let mut clone = builder.clone();
18+
f(value.clone(), &mut clone);
19+
clone
20+
})
21+
})
22+
.collect()
23+
}
24+
25+
fn build(self) -> Vec<ProducerConfig> {
26+
self.into_iter().map(|x| x.build().unwrap()).collect()
27+
}
28+
}
29+
30+
impl CrossIterate<ConsumerConfig, ConsumerConfigBuilder> for Vec<ConsumerConfigBuilder> {
31+
fn cross_iterate<T: Clone, F: Fn(T, &mut ConsumerConfigBuilder) + Copy>(
32+
self,
33+
values: &[T],
34+
f: F,
35+
) -> Self {
36+
self.into_iter()
37+
.flat_map(|builder| {
38+
values.iter().map(move |value| {
39+
let mut clone = builder.clone();
40+
f(value.clone(), &mut clone);
41+
clone
42+
})
43+
})
44+
.collect()
45+
}
46+
47+
fn build(self) -> Vec<ConsumerConfig> {
48+
self.into_iter().map(|x| x.build().unwrap()).collect()
49+
}
50+
}

0 commit comments

Comments
 (0)