From 18701626082e74c3656c132ecff51ac5fd2fc4fb Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Fri, 14 Feb 2025 22:56:17 +0530 Subject: [PATCH] leader prefer test --- smoketests/root_config.toml | 12 --- smoketests/tests/replication.py | 155 ++++++++++++++++++-------------- 2 files changed, 88 insertions(+), 79 deletions(-) delete mode 100644 smoketests/root_config.toml diff --git a/smoketests/root_config.toml b/smoketests/root_config.toml deleted file mode 100644 index 9874758f4e1..00000000000 --- a/smoketests/root_config.toml +++ /dev/null @@ -1,12 +0,0 @@ -default_server = "local" -spacetimedb_token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJFUzI1NiJ9.eyJoZXhfaWRlbnRpdHkiOiJjMjAwZTA0NmQ0YWUxM2M4NzQwMGI3ZDYyMWQyNjA3ZjBiYzYyNTgwZWExNzY0MmU3ZTcwNzg2M2U3ZTk1NmY3Iiwic3ViIjoicGxhY2Vob2xkZXItbm9kZS1pZCIsImlzcyI6ImxvY2FsaG9zdCIsImF1ZCI6W10sImlhdCI6MTczOTQ3MDY0OCwiZXhwIjoxNzM5NDcxMDA4fQ.Xh5zq5MPbdqWUv3Opt_PnuGkg4aCer_gg0mLNDLo5KDCZfdmTE3E9EJvBFcQcwloRcmWr_As9Hjbx2EnPrzItQ" - -[[server_configs]] -nickname = "local" -host = "127.0.0.1:3000" -protocol = "http" - -[[server_configs]] -nickname = "testnet" -host = "testnet.spacetimedb.com" -protocol = "https" diff --git a/smoketests/tests/replication.py b/smoketests/tests/replication.py index 0693d4826ca..4f28a412dae 100644 --- a/smoketests/tests/replication.py +++ b/smoketests/tests/replication.py @@ -292,87 +292,108 @@ def count_rows(self): return len(message_tb_raw.splitlines()) - 2 -# def test_leader_election_in_loop(self): -# """This test fails a leader, wait for new leader to be elected and verify if commits replicated to new leader""" -# -# for i in range(5): -# cur_leader = self.cluster.wait_for_leader_change(None) -# self.cluster.ensure_leader_health() -# -# print("killing current leader: {}", cur_leader) -# container_id = self.cluster.fail_leader() -# -# -# self.assertIsNotNone(container_id) -# -# next_leader = self.cluster.wait_for_leader_change(cur_leader) -# self.assertNotEqual(cur_leader, next_leader) -# # this check if leader election happened -# self.cluster.ensure_leader_health() -# self.assertEqual(self.count_rows(), 2 * (i+1)) -# # restart the old leader, so that we can maintain quorum for next iteration -# self.cluster.restore_leader(container_id, 'start') -# -# time.sleep(5) -# retry(lambda: self.call("clean_up")) -# -# -# def test_leader_disconnect_in_loop(self): -# """This test disconnects a leader, wait for new leader to be elected and verify if commits replicated to new leader""" -# -# for i in range(5): -# cur_leader = self.cluster.wait_for_leader_change(None) -# self.cluster.ensure_leader_health() + def test_leader_b_election_in_loop(self): + """This test fails a leader, wait for new leader to be elected and verify if commits replicated to new leader""" + + for i in range(5): + cur_leader = self.cluster.wait_for_leader_change(None) + self.cluster.ensure_leader_health() + + print("killing current leader: {}", cur_leader) + container_id = self.cluster.fail_leader() + + + self.assertIsNotNone(container_id) + + next_leader = self.cluster.wait_for_leader_change(cur_leader) + self.assertNotEqual(cur_leader, next_leader) + # this check if leader election happened + self.cluster.ensure_leader_health() + self.assertEqual(self.count_rows(), 2 * (i+1)) + # restart the old leader, so that we can maintain quorum for next iteration + self.cluster.restore_leader(container_id, 'start') + + time.sleep(5) + retry(lambda: self.call("clean_up")) + +def test_leader_c_disconnect_in_loop(self): + """This test disconnects a leader, wait for new leader to be elected and verify if commits replicated to new leader""" + + for i in range(5): + cur_leader = self.cluster.wait_for_leader_change(None) + self.cluster.ensure_leader_health() + + container_id = self.cluster.fail_leader('disconnect') + + self.assertIsNotNone(container_id) + + next_leader = self.cluster.wait_for_leader_change(cur_leader) + self.assertNotEqual(cur_leader, next_leader) + # this check if leader election happened + self.cluster.ensure_leader_health() + self.assertEqual(self.count_rows(), 2 * (i+1)) + + # restart the old leader, so that we can maintain quorum for next iteration + self.cluster.restore_leader(container_id, 'connect') + time.sleep(1) + + time.sleep(5) + retry(lambda: self.call("clean_up")) + + + +# def test_drain_leader_node(self): +# """This test moves leader replica to different node""" +# self.add_me_as_admin() +# cur_leader_node_id = self.cluster.wait_for_leader_change(None) +# self.cluster.ensure_leader_health() # -# print("killing current leader: {}", cur_leader) -# container_id = self.cluster.fail_leader('disconnect') -# -# self.assertIsNotNone(container_id) +# replicas = self.cluster.get_all_replicas() +# empty_node_id = 14 +# for replica in replicas: +# empty_node_id = empty_node_id - replica['node_id'] +# self.spacetime("call", "spacetime-control", "drain_node", f"{cur_leader_node_id}", f"{empty_node_id}") # -# next_leader = self.cluster.wait_for_leader_change(cur_leader) -# self.assertNotEqual(cur_leader, next_leader) -# # this check if leader election happened -# self.cluster.ensure_leader_health() -# self.assertEqual(self.count_rows(), 2 * (i+1)) -# -# # restart the old leader, so that we can maintain quorum for next iteration -# self.cluster.restore_leader(container_id, 'connect') -# # time.sleep(5) -# retry(lambda: self.call("clean_up")) +# self.cluster.ensure_leader_health() +# replicas = self.cluster.get_all_replicas() +# for replica in replicas: +# self.assertNotEqual(replica['node_id'], cur_leader_node_id) # - - def test_drain_leader_node(self): + def test_prefer_leader(self): """This test moves leader replica to different node""" self.add_me_as_admin() cur_leader_node_id = self.cluster.wait_for_leader_change(None) self.cluster.ensure_leader_health() replicas = self.cluster.get_all_replicas() - empty_node_id = 14 + prefer_replica = {} for replica in replicas: - empty_node_id = empty_node_id - replica['node_id'] - self.spacetime("call", "spacetime-control", "drain_node", f"{cur_leader_node_id}", f"{empty_node_id}") + if replica['node_id'] != cur_leader_node_id: + prefer_replica = replica + break + prefer_replica_id = prefer_replica['replica_id'] + self.spacetime("call", "spacetime-control", "prefer_leader", f"{prefer_replica_id}") - time.sleep(5) + next_leader_node_id = self.cluster.wait_for_leader_change(cur_leader_node_id) self.cluster.ensure_leader_health() - replicas = self.cluster.get_all_replicas() - for replica in replicas: - self.assertNotEqual(replica['node_id'], cur_leader_node_id) - -# def test_many_transactions(self): -# """This test sends many messages to the database and verifies that they are all present""" -# cur_leader = self.wait_for_leader_change(None) -# num_messages = 1000 -# self.add(range(num_messages+1)) -# message_table = self.sql(f"SELECT text FROM message where text='{num_messages}'") -# self.assertIn("1000", message_table) -# self.assertEqual(self.count_rows(), num_messages) -# -# retry_on_error(lambda: self.call("clean_up")) -# -# + self.assertEqual(prefer_replica['node_id'], next_leader_node_id) + self.assertEqual(self.count_rows(), 2) + + + def test_a_many_transactions(self): + """This test sends many messages to the database and verifies that they are all present""" + cur_leader = self.cluster.wait_for_leader_change(None) + num_messages = 1000 + self.add(range(num_messages)) + message_table = self.sql(f"SELECT text FROM message where text='{num_messages -1}'") + self.assertIn("999", message_table) + self.assertEqual(self.count_rows(), num_messages) + + + retry(lambda: self.call("clean_up")) + # def test_quorum_loss(self): # """This test makes cluster to lose majority of followers to verify if leader eventually stop accepting writes""" #