Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compress the snapshot & commit log #2034

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,10 @@ uuid = { version = "1.2.1", features = ["v4"] }
walkdir = "2.2.5"
wasmbin = "0.6"
webbrowser = "1.0.2"
zstd = { version = "0.13.2", features = ["arrays", "zdict_builder"] }
snap = "1.1.1"
lz4_flex = { version = "0.11" }

xdg = "2.5"

# Vendor the openssl we rely on, rather than depend on a
Expand Down
2 changes: 2 additions & 0 deletions crates/bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ spacetimedb-paths.workspace = true
spacetimedb-primitives = { path = "../primitives" }
spacetimedb-sats = { path = "../sats" }
spacetimedb-schema = { workspace = true, features = ["test"] }
spacetimedb-snapshot = { path = "../snapshot" }
spacetimedb-standalone = { path = "../standalone" }
spacetimedb-table = { path = "../table" }
spacetimedb-testing = { path = "../testing" }
spacetimedb-fs-utils.workspace = true

anyhow.workspace = true
anymap.workspace = true
Expand Down
135 changes: 134 additions & 1 deletion crates/bench/benches/special.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,29 @@
use criterion::async_executor::AsyncExecutor;
use criterion::{criterion_group, criterion_main, Criterion, SamplingMode};
use mimalloc::MiMalloc;
use spacetimedb::db::datastore::traits::IsolationLevel;
use spacetimedb::db::relational_db::tests_utils::{make_snapshot, TestDB};
use spacetimedb::db::relational_db::{open_snapshot_repo, RelationalDB};
use spacetimedb::execution_context::Workload;
use spacetimedb_bench::{
database::BenchDatabase,
schemas::{create_sequential, u32_u64_str, u32_u64_u64, u64_u64_u32, BenchTable, RandomTable},
spacetime_module::SpacetimeModule,
};
use spacetimedb_fs_utils::compression::CompressType;
use spacetimedb_lib::sats::{self, bsatn};
use spacetimedb_lib::{bsatn::ToBsatn as _, ProductValue};
use spacetimedb_lib::{bsatn::ToBsatn as _, Identity, ProductValue};
use spacetimedb_paths::server::{ReplicaDir, SnapshotsPath};
use spacetimedb_paths::FromPathUnchecked;
use spacetimedb_schema::schema::TableSchema;
use spacetimedb_snapshot::{SnapshotRepository, SnapshotSize};
use spacetimedb_testing::modules::{Csharp, ModuleLanguage, Rust};
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::OnceLock;
use std::time::Duration;
use tempdir::TempDir;
use spacetimedb_sats::bsatn::to_vec;

#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;
Expand All @@ -30,6 +42,9 @@ fn custom_benchmarks<L: ModuleLanguage>(c: &mut Criterion) {

custom_module_benchmarks(&db, c);
custom_db_benchmarks(&db, c);

snapshot(c);
snapshot_existing(c);
}

fn custom_module_benchmarks<L: ModuleLanguage>(m: &SpacetimeModule<L>, c: &mut Criterion) {
Expand Down Expand Up @@ -186,5 +201,123 @@ fn serialize_benchmarks<
// TODO: deserialize benches (needs a typespace)
}

fn _snapshot<F>(c: &mut Criterion, name: &str, dir: SnapshotsPath, take: F)
where
F: Fn(&SnapshotRepository),
{
let mut disk_size = None;
let mut size_on_disk = |size: SnapshotSize| {
if size.compressed_type == CompressType::None {
// Save the size of the last snapshot to use as throughput
disk_size = Some(size.clone());
}
dbg!(&size);
};

let algos = [
CompressType::None,
CompressType::Zstd,
CompressType::Lz4,
CompressType::Snap,
];
// For show the size of the last snapshot
for compress in &algos {
let (_, repo) = make_snapshot(dir.clone(), Identity::ZERO, 0, *compress, true);
take(&repo);
size_on_disk(repo.size_on_disk_last_snapshot().unwrap());
}

let mut group = c.benchmark_group(&format!("special/snapshot/{name}]"));
group.throughput(criterion::Throughput::Bytes(disk_size.unwrap().total_size));
group.sample_size(50);
group.warm_up_time(Duration::from_secs(10));
group.sampling_mode(SamplingMode::Flat);

for compress in &algos {
group.bench_function(format!("save_compression_{compress:?}"), |b| {
b.iter_batched(
|| {},
|_| {
let (_, repo) = make_snapshot(dir.clone(), Identity::ZERO, 0, *compress, true);
take(&repo);
},
criterion::BatchSize::NumIterations(100),
);
});

group.bench_function(format!("open_compression_{compress:?}"), |b| {
b.iter_batched(
|| {},
|_| {
let (_, repo) = make_snapshot(dir.clone(), Identity::ZERO, 0, *compress, false);
let last = repo.latest_snapshot().unwrap().unwrap();
repo.read_snapshot(last).unwrap()
},
criterion::BatchSize::NumIterations(100),
);
});
}
}

fn snapshot(c: &mut Criterion) {
let db = TestDB::in_memory().unwrap();

let dir = db.path().snapshots();
dir.create().unwrap();
let mut t1 = TableSchema::from_product_type(u32_u64_str::product_type());
t1.table_name = "u32_u64_str".into();

let mut t2 = TableSchema::from_product_type(u32_u64_u64::product_type());
t2.table_name = "u32_u64_u64".into();

let mut tx = db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal);
let t1 = db.create_table(&mut tx, t1).unwrap();
let t2 = db.create_table(&mut tx, t2).unwrap();

let data = create_sequential::<u32_u64_str>(0xdeadbeef, 1_000, 100);
for row in data.into_iter() {
db.insert(&mut tx, t1, &to_vec(&row.into_product_value()).unwrap()).unwrap();
}

let data = create_sequential::<u32_u64_u64>(0xdeadbeef, 1_000, 100);
for row in data.into_iter() {
db.insert(&mut tx, t2, &to_vec(&row.into_product_value()).unwrap()).unwrap();
}
db.commit_tx(tx).unwrap();

_snapshot(c, "synthetic", dir, |repo| {
db.take_snapshot(repo).unwrap();
});
}

// For test compression into an existing database.
// Must supply the path to the database and the identity of the replica using the `ENV`:
// - `SNAPSHOT` the path to the database, like `/tmp/db/replicas/.../8/database`
// - `IDENTITY` the identity in hex format
fn snapshot_existing(c: &mut Criterion) {
let path_db = if let Ok(path) = std::env::var("SNAPSHOT") {
PathBuf::from(path)
} else {
eprintln!("SNAPSHOT must be set to a valid path to the database");
return;
};
let identity =
Identity::from_hex(std::env::var("IDENTITY").expect("IDENTITY must be set to a valid hex identity")).unwrap();

let path = ReplicaDir::from_path_unchecked(path_db);
let repo = open_snapshot_repo(path.snapshots(), Identity::ZERO, 0).unwrap();

let last = repo.latest_snapshot().unwrap();
let db = RelationalDB::restore_from_snapshot_or_bootstrap(identity, Some(&repo), last).unwrap();

let out = TempDir::new("snapshots").unwrap();

let dir = SnapshotsPath::from_path_unchecked(out.path());

_snapshot(c, "existing", dir, |repo| {
db.take_snapshot(repo).unwrap();
});
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
1 change: 1 addition & 0 deletions crates/commitlog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ serde = { workspace = true, optional = true }
spacetimedb-primitives.workspace = true
spacetimedb-paths.workspace = true
spacetimedb-sats.workspace = true
spacetimedb-fs-utils.workspace = true
tempfile.workspace = true
thiserror.workspace = true

Expand Down
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ spacetimedb-table.workspace = true
spacetimedb-vm.workspace = true
spacetimedb-snapshot.workspace = true
spacetimedb-expr.workspace = true
spacetimedb-fs-utils.workspace = true

anyhow = { workspace = true, features = ["backtrace"] }
arrayvec.workspace = true
Expand Down
Loading
Loading