diff --git a/smoketests/config.toml b/smoketests/config.toml index 258c55e031c..866c5e585a9 100644 --- a/smoketests/config.toml +++ b/smoketests/config.toml @@ -1,6 +1,5 @@ default_server = "127.0.0.1:3000" - -spacetimedb_token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJFUzI1NiJ9.eyJoZXhfaWRlbnRpdHkiOiJjMjAwMTk4MDViYzU2ZDczMmUwN2ZjZjlkYWJjYTJkY2I3ZGIwMzMwYjQyOGJjMTI0Y2E0YmMzNWQxNTgzZTkxIiwic3ViIjoiMDMwYmM2NjAtMDdlZC00ZDg0LThkMTYtMDRmMTUzMTgzZjZhIiwiaXNzIjoibG9jYWxob3N0IiwiYXVkIjpbInNwYWNldGltZWRiIl0sImlhdCI6MTczODk1MjY3OCwiZXhwIjpudWxsfQ.TInoGjszYrkVw6vqk_vfj_zgirJcf2-8aNQZHr8e0dJRYK9sCMh2RAjJ_odTbgXRmEIaW416vOZjFGVzuUH6Sg" +spacetimedb_token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJFUzI1NiJ9.eyJoZXhfaWRlbnRpdHkiOiJjMjAwZTJlMWRlNTY0YmU2YzAyMDU4YzQwYzg0ZDdhNjg4OWE3MzA4ZTRjYjA5OGU1NTRjZDhlZWE1MDNhN2E3Iiwic3ViIjoiMGFmOTM0OTUtZGY1Yy00Y2ViLTllNjktNmVhZDVlMDVmMGUzIiwiaXNzIjoibG9jYWxob3N0IiwiYXVkIjpbInNwYWNldGltZWRiIl0sImlhdCI6MTc0MDczODI4OSwiZXhwIjpudWxsfQ.yF-glntsyd5xusUDFFtSWkPtONB70H4lLeDrd84m6ZCJ67W_1RegIqIpCDMaAJFmY5euDAO7UWUFiicJ86vSrA" [[server_configs]] nickname = "localhost" diff --git a/smoketests/tests/replication.py b/smoketests/tests/replication.py index 1216990acce..976234f3d3d 100644 --- a/smoketests/tests/replication.py +++ b/smoketests/tests/replication.py @@ -198,15 +198,16 @@ def wait_for_leader_change(self, previous_leader_node, max_attempts=10, delay=2) time.sleep(delay) return None - def ensure_leader_health(self, wait_time=2): - """Verify leader is healthy by testing basic operations.""" + def ensure_leader_health(self, id, wait_time=2): + """Verify leader is healthy by inserting a row.""" if wait_time: time.sleep(wait_time) - rnd = random.randint(9000, 10000) - retry(lambda: self.test.call("add", f"{rnd}")) - add_table = str(self.test.sql(f"SELECT id, text FROM message where text='{rnd}'")) - return str(rnd) in add_table + retry(lambda: self.test.call("start", id, 1)) + add_table = str(self.test.sql(f"SELECT id FROM counter where id={id}")) + if str(id) not in add_table: + raise ValueError(f"Could not find {rnd} in counter table") + def fail_leader(self, action='kill'): """Force leader failure through either killing or network disconnect.""" @@ -238,27 +239,55 @@ def restore_leader(self, container_id, action='start'): @requires_docker class ReplicationTest(Smoketest): MODULE_CODE = """ -use spacetimedb::{ReducerContext, Table, log}; +use spacetimedb::{duration, ReducerContext, Table}; -#[spacetimedb::table(name = message, public)] -pub struct Message { +#[spacetimedb::table(name = counter, public)] +pub struct Counter { #[primary_key] #[auto_inc] id: u64, - text: String, + #[index(btree)] + value: u64, } -#[spacetimedb::reducer] -fn add(ctx: &ReducerContext, text: String) { - log::info!("adding message: {}", text); - ctx.db.message().insert(Message {id:0, text}); +#[spacetimedb::table(name = schedule_counter, public, scheduled(increment, at = sched_at))] +pub struct ScheduledCounter { + #[primary_key] + #[auto_inc] + scheduled_id: u64, + sched_at: spacetimedb::ScheduleAt, + count: u64, } +#[spacetimedb::reducer] +fn increment(ctx: &ReducerContext, arg: ScheduledCounter) { + // if the counter exists, increment it + if let Some(counter) = ctx.db.counter().id().find(arg.scheduled_id) { + if counter.value == arg.count { + ctx.db.schedule_counter().delete(arg); + return; + } + // update counter + ctx.db.counter().id().update(Counter { + id: arg.scheduled_id, + value: counter.value + 1, + }); + } else { + // insert fresh counter + ctx.db.counter().insert(Counter { + id: arg.scheduled_id, + value: 1, + }); + } +} #[spacetimedb::reducer] -fn clean_up(ctx: &ReducerContext) { - log::info!("cleaning up messages"); - ctx.db.message().iter().for_each(|m| {ctx.db.message().delete(m); }); +fn start(ctx: &ReducerContext, id: u64, count: u64) { + ctx.db.schedule_counter().insert(ScheduledCounter { + scheduled_id: id, + sched_at: duration!(0ms).into(), + count, + }); } """ @@ -281,47 +310,44 @@ def add_me_as_admin(self): spacetime("--config-path", self.root_config, "login", "--token", self.root_token) spacetime("--config-path", self.root_config, "call", "spacetime-control", "create_admin_account", f"0x{db_owner_id}") - def add(self, r: range): + def start(self, id: int, count: int): """Send a message to the database.""" - for i in r: - retry(lambda: self.call("add", f"{i}")) - - def count_rows(self): - message_tb_raw = str(self.sql("SELECT id FROM message")) - # -2 to remove header - return len(message_tb_raw.splitlines()) - 2 + retry(lambda: self.call("start", id, count)) - - def test_leader_b_election_in_loop(self): + def test_leader_election_in_loop(self): """This test fails a leader, wait for new leader to be elected and verify if commits replicated to new leader""" - - for i in range(5): + iterations = 5; + row_ids = [101 + i for i in range(iterations * 2)] + for (first_id, second_id) in zip(row_ids[::2], row_ids[1::2]): cur_leader = self.cluster.wait_for_leader_change(None) - self.cluster.ensure_leader_health() + self.cluster.ensure_leader_health(first_id) print("killing current leader: {}", cur_leader) container_id = self.cluster.fail_leader() - self.assertIsNotNone(container_id) next_leader = self.cluster.wait_for_leader_change(cur_leader) self.assertNotEqual(cur_leader, next_leader) # this check if leader election happened - self.cluster.ensure_leader_health() - self.assertEqual(self.count_rows(), 2 * (i+1)) + self.cluster.ensure_leader_health(second_id) # restart the old leader, so that we can maintain quorum for next iteration self.cluster.restore_leader(container_id, 'start') - time.sleep(5) - retry(lambda: self.call("clean_up")) + # verify if all past rows are present in new leader + for row_id in row_ids: + table = self.spacetime("sql", self.database_identity, f"SELECT * FROM counter WHERE id = {row_id}") + self.assertIn(f"{row_id}", table) def test_leader_c_disconnect_in_loop(self): """This test disconnects a leader, wait for new leader to be elected and verify if commits replicated to new leader""" - for i in range(5): + iterations = 5; + row_ids = [201 + i for i in range(iterations * 2)] + + for (first_id, second_id) in zip(row_ids[::2], row_ids[1::2]): cur_leader = self.cluster.wait_for_leader_change(None) - self.cluster.ensure_leader_health() + self.cluster.ensure_leader_health(first_id) container_id = self.cluster.fail_leader('disconnect') @@ -330,16 +356,16 @@ def test_leader_c_disconnect_in_loop(self): next_leader = self.cluster.wait_for_leader_change(cur_leader) self.assertNotEqual(cur_leader, next_leader) # this check if leader election happened - self.cluster.ensure_leader_health() - self.assertEqual(self.count_rows(), 2 * (i+1)) + self.cluster.ensure_leader_health(second_id) # restart the old leader, so that we can maintain quorum for next iteration self.cluster.restore_leader(container_id, 'connect') time.sleep(1) - - time.sleep(5) - retry(lambda: self.call("clean_up")) + # verify if all past rows are present in new leader + for row_id in row_ids: + table = self.spacetime("sql", self.database_identity, f"SELECT * FROM counter WHERE id = {row_id}") + self.assertIn(f"{row_id}", table) # def test_drain_leader_node(self): @@ -365,7 +391,7 @@ def test_prefer_leader(self): """This test moves leader replica to different node""" self.add_me_as_admin() cur_leader_node_id = self.cluster.wait_for_leader_change(None) - self.cluster.ensure_leader_health() + self.cluster.ensure_leader_health(301) replicas = self.cluster.get_all_replicas() prefer_replica = {} @@ -377,22 +403,27 @@ def test_prefer_leader(self): self.spacetime("call", "spacetime-control", "prefer_leader", f"{prefer_replica_id}") next_leader_node_id = self.cluster.wait_for_leader_change(cur_leader_node_id) - self.cluster.ensure_leader_health() + self.cluster.ensure_leader_health(302) self.assertEqual(prefer_replica['node_id'], next_leader_node_id) - self.assertEqual(self.count_rows(), 2) + + + # verify if all past rows are present in new leader + for row_id in [301, 302]: + table = self.spacetime("sql", self.database_identity, f"SELECT * FROM counter WHERE id = {row_id}") + self.assertIn(f"{row_id}", table) def test_a_many_transactions(self): """This test sends many messages to the database and verifies that they are all present""" - cur_leader = self.cluster.wait_for_leader_change(None) - num_messages = 1000 - self.add(range(num_messages)) - message_table = self.sql(f"SELECT text FROM message where text='{num_messages -1}'") - self.assertIn("999", message_table) - self.assertEqual(self.count_rows(), num_messages) + self.cluster.wait_for_leader_change(None) + num_messages = 10000 + sub = self.subscribe("select * from counter", n=num_messages) + self.start(1, num_messages) + + message_table = sub()[-1:]; + self.assertIn({'counter': {'deletes': [{'id': 1, 'value': 9999}], 'inserts': [{'id': 1, 'value': 10000}]}}, message_table) - retry(lambda: self.call("clean_up")) # def test_quorum_loss(self): # """This test makes cluster to lose majority of followers to verify if leader eventually stop accepting writes"""