Skip to content

Commit

Permalink
Create a load testing tool and reducers to call for evaluate queries
Browse files Browse the repository at this point in the history
  • Loading branch information
mamcx committed Jan 29, 2025
1 parent c6a5f78 commit 334d1db
Show file tree
Hide file tree
Showing 2 changed files with 292 additions and 0 deletions.
109 changes: 109 additions & 0 deletions crates/bench/load.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# Mini-tool for executing load testing and call reducer functions.
import argparse
import subprocess
import sys
import time
from datetime import datetime, timedelta


class ProgressBar:
def __init__(self, total: int, label: str, size: int):
"""
Initialize the progress bar.
Args:
total (int): The total number of steps/items.
label (str): Label for the progress bar.
size (int): The width of the progress bar.
"""
self.total = total
self.label = label
self.size = size
self.current = 0
self.suffix = ""

def show(self):
progress = int(self.size * self.current / self.total)
bar = "█" * progress + "." * (self.size - progress)
print(f"{self.label} {bar} {self.current}/{self.total} {self.suffix}", end="\r", flush=True)

def step(self, steps: int = 1):
self.current = min(self.current + steps, self.total)
self.show()

def finish(self):
self.current = self.total
self.show()
print()


def _run(progress: ProgressBar, title: str, cli: str, database: str, cmd: list):
for reducer in cmd:
progress.label = title
progress.suffix = f' {reducer}'
progress.show()
subprocess.check_call(f'{cli} call {database} {reducer}', shell=True)
progress.step()


def run(cli: str, database: str, init: list, load: list, frequency: float, duration: float):
print(f'Running load testing for database: {database}')
print(f" Frequency: {frequency} calls/second, Duration: {duration} seconds")
print()
if init:
progress = ProgressBar(len(init), label="Processing...", size=20)
_run(progress, f'Init reducers for database: {database}', cli, database, init)
progress.finish()

if load:
start_time = datetime.now()
end_time = start_time + timedelta(seconds=duration)
current_time = start_time
interval = 1.0 / frequency

progress = ProgressBar(int(duration * frequency) * len(load), label="Processing...", size=20)
while current_time < end_time:
_run(progress, f'Load reducers for database: {database}', cli, database, load)
time.sleep(interval)
current_time = datetime.now()
progress.finish()

print(f'Load testing for database: {database} finished.')


if __name__ == '__main__':
"""
Usage:
python load.py -d <database> -i <init_reducers> -l <load_reducers> [--no-cli] [-f <frequency>] [-s <seconds>]
Example:
python load.py -d quickstart -f 2 -s 10 -i "insert_bulk_small_rows 100" -l "queries 'small, inserts:10,query:10,deletes:10';"
"""
parser = argparse.ArgumentParser()

parser.add_argument('-d', '--database', type=str, help='Database name', required=True)
parser.add_argument('-i', '--init', type=str, help='Init reducers, separated by ;')
parser.add_argument('-l', '--load', type=str, help='Load reducers, separated by ;')
parser.add_argument('-f', '--frequency', type=float, default=1.0,
help="Frequency (calls per second)")
parser.add_argument('-s', '--seconds', type=float, default=1.0, help="Duration (in seconds)")
parser.add_argument('--no-cli', action='store_false', dest='cli',
help='Disable spacetime-cli if true, run `cargo run...` instead',
default=True)

args = vars(parser.parse_args())

database = args['database']
cli = args['cli']
frequency = args['frequency']
duration = args['seconds']

init = [x.strip() for x in (args['init'] or '').split(';') if x.strip()]
load = [x.strip() for x in (args['load'] or '').split(';') if x.strip()]

if cli:
cli = '/Users/mamcx/.cargo/bin/spacetimedb-cli'
else:
cli = 'cargo run -p spacetimedb-cli --bin spacetimedb-cli'

run(cli, database, init, load, frequency, duration)
183 changes: 183 additions & 0 deletions modules/benchmarks/src/synthetic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,105 @@ pub fn load(ctx: &ReducerContext, input: String) -> Result<(), String> {

Ok(())
}

/// Used to execute a series of reducers in sequence for benchmarking purposes.
///
/// The input is a string with the following format:
///
/// `load_type`: [`Load`], `inserts`: `u32`, `query`: `u32`, `deletes`: `u32`
///
/// The order of the `inserts`, `query`, and `deletes` can be changed and will be executed in that order.
#[spacetimedb::reducer]
pub fn queries(ctx: &ReducerContext, input: String) -> Result<(), String> {
let args = input.split(',').map(|x| x.trim().to_lowercase()).collect::<Vec<_>>();
if args.len() < 2 {
return Err(format!("Expected at least 2 arguments, got {}", args.len()));
}
let load = match args[0].as_str() {
"tiny" => Load::Tiny,
"small" => Load::Small,
"medium" => Load::Medium,
"large" => Load::Large,
x => {
return Err(format!(
"Invalid load type: '{x}', expected: tiny, small, medium, or large"
))
}
};

let mut inserts = 0u64;
let mut queries = 0u64;
let mut deletes = 0u64;

for arg in args.iter().skip(1) {
let parts = arg.split(':').map(|x| x.trim()).collect::<Vec<_>>();
if parts.len() != 2 {
return Err(format!("Invalid argument: '{arg}', expected: 'operation:count'"));
}
let count = parts[1].parse::<u64>().map_err(|e| format!("Invalid count: {}", e))?;
match parts[0] {
"inserts" => inserts = count,
"query" => queries = count,
"deletes" => deletes = count,
x => {
return Err(format!(
"Invalid operation: '{x}', expected: inserts, query, or deletes"
))
}
}
}

log::info!("Executing queries: inserts: {inserts}, query: {queries}, deletes: {deletes}");
// To allow to insert duplicate rows, the `ids` not use `[unique]` attribute, causing to not be able to use `update` method
match load {
Load::Tiny => {
if inserts > 0 {
insert_bulk_tiny_rows(ctx, inserts as u8);
}
for id in 0..queries {
filter_tiny_rows_by_id(ctx, id as u8);
}
for id in 0..deletes {
delete_tiny_rows_by_id(ctx, id as u8);
}
}
Load::Small => {
if inserts > 0 {
insert_bulk_small_rows(ctx, inserts);
}
for id in 0..queries {
filter_small_rows_by_id(ctx, id);
}
for id in 0..deletes {
delete_small_rows_by_id(ctx, id);
}
}
Load::Medium => {
if inserts > 0 {
insert_bulk_medium_var_rows(ctx, inserts);
}
for id in 0..queries {
filter_medium_var_rows_by_id(ctx, id);
}
for id in 0..deletes {
delete_medium_var_rows_by_id(ctx, id);
}
}
Load::Large => {
if inserts > 0 {
insert_bulk_large_var_rows(ctx, inserts);
}
for id in 0..queries {
filter_large_var_rows_by_id(ctx, id as u128);
}
for id in 0..deletes {
delete_large_var_rows_by_id(ctx, id as u128);
}
}
}

Ok(())
}
// ---------- update ----------

#[spacetimedb::reducer]
Expand Down Expand Up @@ -665,6 +764,55 @@ pub fn filter_btree_each_column_u32_u64_u64_by_y(ctx: &ReducerContext, y: u64) {
}
}

#[spacetimedb::reducer]
pub fn filter_tiny_rows_by_id(ctx: &ReducerContext, id: u8) {
for row in ctx.db.tiny_rows().iter().filter(|r| r.id == id) {
black_box(row);
}
}

#[spacetimedb::reducer]
pub fn filter_small_rows_by_id(ctx: &ReducerContext, id: u64) {
for row in ctx.db.small_rows().iter().filter(|r| r.id == id) {
black_box(row);
}
}

#[spacetimedb::reducer]
pub fn filter_small_btree_each_column_rows_by_id(ctx: &ReducerContext, id: u64) {
for row in ctx.db.small_btree_each_column_rows().iter().filter(|r| r.id == id) {
black_box(row);
}
}

#[spacetimedb::reducer]
pub fn filter_medium_var_rows_by_id(ctx: &ReducerContext, id: u64) {
for row in ctx.db.medium_var_rows().iter().filter(|r| r.id == id) {
black_box(row);
}
}

#[spacetimedb::reducer]
pub fn filter_medium_var_rows_btree_each_column_by_id(ctx: &ReducerContext, id: u64) {
for row in ctx.db.medium_var_rows_btree_each_column().iter().filter(|r| r.id == id) {
black_box(row);
}
}

#[spacetimedb::reducer]
pub fn filter_large_var_rows_by_id(ctx: &ReducerContext, id: u128) {
for row in ctx.db.large_var_rows().iter().filter(|r| r.id == id) {
black_box(row);
}
}

#[spacetimedb::reducer]
pub fn filter_large_var_rows_btree_each_column_by_id(ctx: &ReducerContext, id: u128) {
for row in ctx.db.large_var_rows_btree_each_column().iter().filter(|r| r.id == id) {
black_box(row);
}
}

// ---------- delete ----------

// FIXME: current nonunique delete interface is UNUSABLE!!!!
Expand All @@ -679,6 +827,41 @@ pub fn delete_unique_0_u32_u64_u64_by_id(ctx: &ReducerContext, id: u32) {
ctx.db.unique_0_u32_u64_u64().id().delete(id);
}

#[spacetimedb::reducer]
pub fn delete_tiny_rows_by_id(ctx: &ReducerContext, id: u8) {
ctx.db.tiny_rows().id().delete(id);
}

#[spacetimedb::reducer]
pub fn delete_small_rows_by_id(ctx: &ReducerContext, id: u64) {
ctx.db.small_rows().id().delete(id);
}

#[spacetimedb::reducer]
pub fn delete_small_btree_each_column_rows_by_id(ctx: &ReducerContext, id: u64) {
ctx.db.small_btree_each_column_rows().id().delete(id);
}

#[spacetimedb::reducer]
pub fn delete_medium_var_rows_by_id(ctx: &ReducerContext, id: u64) {
ctx.db.medium_var_rows().id().delete(id);
}

#[spacetimedb::reducer]
pub fn delete_medium_var_rows_btree_each_column_by_id(ctx: &ReducerContext, id: u64) {
ctx.db.medium_var_rows_btree_each_column().id().delete(id);
}

#[spacetimedb::reducer]
pub fn delete_large_var_rows_by_id(ctx: &ReducerContext, id: u128) {
ctx.db.large_var_rows().id().delete(id);
}

#[spacetimedb::reducer]
pub fn delete_large_var_rows_btree_each_column_by_id(ctx: &ReducerContext, id: u128) {
ctx.db.large_var_rows_btree_each_column().id().delete(id);
}

// ---------- clear table ----------
#[spacetimedb::reducer]
pub fn clear_table_unique_0_u32_u64_str(_ctx: &ReducerContext) {
Expand Down

0 comments on commit 334d1db

Please sign in to comment.