Skip to content

Commit

Permalink
faster tx module
Browse files Browse the repository at this point in the history
  • Loading branch information
Shubham8287 committed Mar 5, 2025
1 parent 36a2726 commit b99c371
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 54 deletions.
3 changes: 1 addition & 2 deletions smoketests/config.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
default_server = "127.0.0.1:3000"

spacetimedb_token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJFUzI1NiJ9.eyJoZXhfaWRlbnRpdHkiOiJjMjAwMTk4MDViYzU2ZDczMmUwN2ZjZjlkYWJjYTJkY2I3ZGIwMzMwYjQyOGJjMTI0Y2E0YmMzNWQxNTgzZTkxIiwic3ViIjoiMDMwYmM2NjAtMDdlZC00ZDg0LThkMTYtMDRmMTUzMTgzZjZhIiwiaXNzIjoibG9jYWxob3N0IiwiYXVkIjpbInNwYWNldGltZWRiIl0sImlhdCI6MTczODk1MjY3OCwiZXhwIjpudWxsfQ.TInoGjszYrkVw6vqk_vfj_zgirJcf2-8aNQZHr8e0dJRYK9sCMh2RAjJ_odTbgXRmEIaW416vOZjFGVzuUH6Sg"
spacetimedb_token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJFUzI1NiJ9.eyJoZXhfaWRlbnRpdHkiOiJjMjAwN2M3Y2E5ZjJjYjliMjcwOTYyMWU3NWExMzU4NDNlZTljNjBkZjc2YzFjNzI5NDdkOGFiMmY0NmE0OWJiIiwic3ViIjoiMDNlYmExYWUtN2U2Ny00MzY3LWJkNWUtMTc3YzE1MTA4MTdkIiwiaXNzIjoibG9jYWxob3N0IiwiYXVkIjpbInNwYWNldGltZWRiIl0sImlhdCI6MTc0MTIwMDE1NSwiZXhwIjpudWxsfQ.ZXKZUZeQ6y5RPvv8x08DYfXV7tDSQAsxX1TjcWDHXnTxakAG-ZvVyCed7UDjqseZKqwTB-zL3-rayjk6roiM7g"

[[server_configs]]
nickname = "localhost"
Expand Down
135 changes: 83 additions & 52 deletions smoketests/tests/replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,15 +198,16 @@ def wait_for_leader_change(self, previous_leader_node, max_attempts=10, delay=2)
time.sleep(delay)
return None

def ensure_leader_health(self, wait_time=2):
"""Verify leader is healthy by testing basic operations."""
def ensure_leader_health(self, id, wait_time=2):
"""Verify leader is healthy by inserting a row."""
if wait_time:
time.sleep(wait_time)

rnd = random.randint(9000, 10000)
retry(lambda: self.test.call("add", f"{rnd}"))
add_table = str(self.test.sql(f"SELECT id, text FROM message where text='{rnd}'"))
return str(rnd) in add_table
retry(lambda: self.test.call("start", id, 1))
add_table = str(self.test.sql(f"SELECT id FROM counter where id={id}"))
if str(id) not in add_table:
raise ValueError(f"Could not find {rnd} in counter table")


def fail_leader(self, action='kill'):
"""Force leader failure through either killing or network disconnect."""
Expand Down Expand Up @@ -238,27 +239,55 @@ def restore_leader(self, container_id, action='start'):
@requires_docker
class ReplicationTest(Smoketest):
MODULE_CODE = """
use spacetimedb::{ReducerContext, Table, log};
use spacetimedb::{duration, ReducerContext, Table};
#[spacetimedb::table(name = message, public)]
pub struct Message {
#[spacetimedb::table(name = counter, public)]
pub struct Counter {
#[primary_key]
#[auto_inc]
id: u64,
text: String,
#[index(btree)]
value: u64,
}
#[spacetimedb::reducer]
fn add(ctx: &ReducerContext, text: String) {
log::info!("adding message: {}", text);
ctx.db.message().insert(Message {id:0, text});
#[spacetimedb::table(name = schedule_counter, public, scheduled(increment, at = sched_at))]
pub struct ScheduledCounter {
#[primary_key]
#[auto_inc]
scheduled_id: u64,
sched_at: spacetimedb::ScheduleAt,
count: u64,
}
#[spacetimedb::reducer]
fn increment(ctx: &ReducerContext, arg: ScheduledCounter) {
// if the counter exists, increment it
if let Some(counter) = ctx.db.counter().id().find(arg.scheduled_id) {
if counter.value == arg.count {
ctx.db.schedule_counter().delete(arg);
return;
}
// update counter
ctx.db.counter().id().update(Counter {
id: arg.scheduled_id,
value: counter.value + 1,
});
} else {
// insert fresh counter
ctx.db.counter().insert(Counter {
id: arg.scheduled_id,
value: 1,
});
}
}
#[spacetimedb::reducer]
fn clean_up(ctx: &ReducerContext) {
log::info!("cleaning up messages");
ctx.db.message().iter().for_each(|m| {ctx.db.message().delete(m); });
fn start(ctx: &ReducerContext, id: u64, count: u64) {
ctx.db.schedule_counter().insert(ScheduledCounter {
scheduled_id: id,
sched_at: duration!(0ms).into(),
count,
});
}
"""

Expand All @@ -281,47 +310,44 @@ def add_me_as_admin(self):
spacetime("--config-path", self.root_config, "login", "--token", self.root_token)
spacetime("--config-path", self.root_config, "call", "spacetime-control", "create_admin_account", f"0x{db_owner_id}")

def add(self, r: range):
def start(self, id: int, count: int):
"""Send a message to the database."""
for i in r:
retry(lambda: self.call("add", f"{i}"))

def count_rows(self):
message_tb_raw = str(self.sql("SELECT id FROM message"))
# -2 to remove header
return len(message_tb_raw.splitlines()) - 2
retry(lambda: self.call("start", id, count))


def test_leader_b_election_in_loop(self):
def test_leader_election_in_loop(self):
"""This test fails a leader, wait for new leader to be elected and verify if commits replicated to new leader"""

for i in range(5):
iterations = 5;
row_ids = [101 + i for i in range(iterations * 2)]
for (first_id, second_id) in zip(row_ids[::2], row_ids[1::2]):
cur_leader = self.cluster.wait_for_leader_change(None)
self.cluster.ensure_leader_health()
self.cluster.ensure_leader_health(first_id)

print("killing current leader: {}", cur_leader)
container_id = self.cluster.fail_leader()


self.assertIsNotNone(container_id)

next_leader = self.cluster.wait_for_leader_change(cur_leader)
self.assertNotEqual(cur_leader, next_leader)
# this check if leader election happened
self.cluster.ensure_leader_health()
self.assertEqual(self.count_rows(), 2 * (i+1))
self.cluster.ensure_leader_health(second_id)
# restart the old leader, so that we can maintain quorum for next iteration
self.cluster.restore_leader(container_id, 'start')

time.sleep(5)
retry(lambda: self.call("clean_up"))
# verify if all past rows are present in new leader
for row_id in row_ids:
table = self.spacetime("sql", self.database_identity, f"SELECT * FROM counter WHERE id = {row_id}")
self.assertIn(f"{row_id}", table)

def test_leader_c_disconnect_in_loop(self):
"""This test disconnects a leader, wait for new leader to be elected and verify if commits replicated to new leader"""

for i in range(5):
iterations = 5;
row_ids = [201 + i for i in range(iterations * 2)]

for (first_id, second_id) in zip(row_ids[::2], row_ids[1::2]):
cur_leader = self.cluster.wait_for_leader_change(None)
self.cluster.ensure_leader_health()
self.cluster.ensure_leader_health(first_id)

container_id = self.cluster.fail_leader('disconnect')

Expand All @@ -330,16 +356,16 @@ def test_leader_c_disconnect_in_loop(self):
next_leader = self.cluster.wait_for_leader_change(cur_leader)
self.assertNotEqual(cur_leader, next_leader)
# this check if leader election happened
self.cluster.ensure_leader_health()
self.assertEqual(self.count_rows(), 2 * (i+1))
self.cluster.ensure_leader_health(second_id)

# restart the old leader, so that we can maintain quorum for next iteration
self.cluster.restore_leader(container_id, 'connect')
time.sleep(1)

time.sleep(5)
retry(lambda: self.call("clean_up"))

# verify if all past rows are present in new leader
for row_id in row_ids:
table = self.spacetime("sql", self.database_identity, f"SELECT * FROM counter WHERE id = {row_id}")
self.assertIn(f"{row_id}", table)


# def test_drain_leader_node(self):
Expand All @@ -365,7 +391,7 @@ def test_prefer_leader(self):
"""This test moves leader replica to different node"""
self.add_me_as_admin()
cur_leader_node_id = self.cluster.wait_for_leader_change(None)
self.cluster.ensure_leader_health()
self.cluster.ensure_leader_health(301)

replicas = self.cluster.get_all_replicas()
prefer_replica = {}
Expand All @@ -377,22 +403,27 @@ def test_prefer_leader(self):
self.spacetime("call", "spacetime-control", "prefer_leader", f"{prefer_replica_id}")

next_leader_node_id = self.cluster.wait_for_leader_change(cur_leader_node_id)
self.cluster.ensure_leader_health()
self.cluster.ensure_leader_health(302)
self.assertEqual(prefer_replica['node_id'], next_leader_node_id)
self.assertEqual(self.count_rows(), 2)


# verify if all past rows are present in new leader
for row_id in [301, 302]:
table = self.spacetime("sql", self.database_identity, f"SELECT * FROM counter WHERE id = {row_id}")
self.assertIn(f"{row_id}", table)


def test_a_many_transactions(self):
"""This test sends many messages to the database and verifies that they are all present"""
cur_leader = self.cluster.wait_for_leader_change(None)
num_messages = 1000
self.add(range(num_messages))
message_table = self.sql(f"SELECT text FROM message where text='{num_messages -1}'")
self.assertIn("999", message_table)
self.assertEqual(self.count_rows(), num_messages)
self.cluster.wait_for_leader_change(None)
num_messages = 10000
sub = self.subscribe("select * from counter", n=num_messages)
self.start(1, num_messages)

message_table = sub()[-1:];
self.assertIn({'counter': {'deletes': [{'id': 1, 'value': 9999}], 'inserts': [{'id': 1, 'value': 10000}]}}, message_table)


retry(lambda: self.call("clean_up"))

# def test_quorum_loss(self):
# """This test makes cluster to lose majority of followers to verify if leader eventually stop accepting writes"""
Expand Down

0 comments on commit b99c371

Please sign in to comment.