Skip to content

Commit

Permalink
leader prefer test
Browse files Browse the repository at this point in the history
  • Loading branch information
Shubham8287 committed Feb 20, 2025
1 parent 33babcb commit 1870162
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 79 deletions.
12 changes: 0 additions & 12 deletions smoketests/root_config.toml

This file was deleted.

155 changes: 88 additions & 67 deletions smoketests/tests/replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,87 +292,108 @@ def count_rows(self):
return len(message_tb_raw.splitlines()) - 2


# def test_leader_election_in_loop(self):
# """This test fails a leader, wait for new leader to be elected and verify if commits replicated to new leader"""
#
# for i in range(5):
# cur_leader = self.cluster.wait_for_leader_change(None)
# self.cluster.ensure_leader_health()
#
# print("killing current leader: {}", cur_leader)
# container_id = self.cluster.fail_leader()
#
#
# self.assertIsNotNone(container_id)
#
# next_leader = self.cluster.wait_for_leader_change(cur_leader)
# self.assertNotEqual(cur_leader, next_leader)
# # this check if leader election happened
# self.cluster.ensure_leader_health()
# self.assertEqual(self.count_rows(), 2 * (i+1))
# # restart the old leader, so that we can maintain quorum for next iteration
# self.cluster.restore_leader(container_id, 'start')
#
# time.sleep(5)
# retry(lambda: self.call("clean_up"))
#
#
# def test_leader_disconnect_in_loop(self):
# """This test disconnects a leader, wait for new leader to be elected and verify if commits replicated to new leader"""
#
# for i in range(5):
# cur_leader = self.cluster.wait_for_leader_change(None)
# self.cluster.ensure_leader_health()
def test_leader_b_election_in_loop(self):
"""This test fails a leader, wait for new leader to be elected and verify if commits replicated to new leader"""

for i in range(5):
cur_leader = self.cluster.wait_for_leader_change(None)
self.cluster.ensure_leader_health()

print("killing current leader: {}", cur_leader)
container_id = self.cluster.fail_leader()


self.assertIsNotNone(container_id)

next_leader = self.cluster.wait_for_leader_change(cur_leader)
self.assertNotEqual(cur_leader, next_leader)
# this check if leader election happened
self.cluster.ensure_leader_health()
self.assertEqual(self.count_rows(), 2 * (i+1))
# restart the old leader, so that we can maintain quorum for next iteration
self.cluster.restore_leader(container_id, 'start')

time.sleep(5)
retry(lambda: self.call("clean_up"))

def test_leader_c_disconnect_in_loop(self):
"""This test disconnects a leader, wait for new leader to be elected and verify if commits replicated to new leader"""

for i in range(5):
cur_leader = self.cluster.wait_for_leader_change(None)
self.cluster.ensure_leader_health()

container_id = self.cluster.fail_leader('disconnect')

self.assertIsNotNone(container_id)

next_leader = self.cluster.wait_for_leader_change(cur_leader)
self.assertNotEqual(cur_leader, next_leader)
# this check if leader election happened
self.cluster.ensure_leader_health()
self.assertEqual(self.count_rows(), 2 * (i+1))

# restart the old leader, so that we can maintain quorum for next iteration
self.cluster.restore_leader(container_id, 'connect')
time.sleep(1)

time.sleep(5)
retry(lambda: self.call("clean_up"))



# def test_drain_leader_node(self):
# """This test moves leader replica to different node"""
# self.add_me_as_admin()
# cur_leader_node_id = self.cluster.wait_for_leader_change(None)
# self.cluster.ensure_leader_health()
#
# print("killing current leader: {}", cur_leader)
# container_id = self.cluster.fail_leader('disconnect')
#
# self.assertIsNotNone(container_id)
# replicas = self.cluster.get_all_replicas()
# empty_node_id = 14
# for replica in replicas:
# empty_node_id = empty_node_id - replica['node_id']
# self.spacetime("call", "spacetime-control", "drain_node", f"{cur_leader_node_id}", f"{empty_node_id}")
#
# next_leader = self.cluster.wait_for_leader_change(cur_leader)
# self.assertNotEqual(cur_leader, next_leader)
# # this check if leader election happened
# self.cluster.ensure_leader_health()
# self.assertEqual(self.count_rows(), 2 * (i+1))
#
# # restart the old leader, so that we can maintain quorum for next iteration
# self.cluster.restore_leader(container_id, 'connect')
#
# time.sleep(5)
# retry(lambda: self.call("clean_up"))
# self.cluster.ensure_leader_health()
# replicas = self.cluster.get_all_replicas()
# for replica in replicas:
# self.assertNotEqual(replica['node_id'], cur_leader_node_id)
#


def test_drain_leader_node(self):
def test_prefer_leader(self):
"""This test moves leader replica to different node"""
self.add_me_as_admin()
cur_leader_node_id = self.cluster.wait_for_leader_change(None)
self.cluster.ensure_leader_health()

replicas = self.cluster.get_all_replicas()
empty_node_id = 14
prefer_replica = {}
for replica in replicas:
empty_node_id = empty_node_id - replica['node_id']
self.spacetime("call", "spacetime-control", "drain_node", f"{cur_leader_node_id}", f"{empty_node_id}")
if replica['node_id'] != cur_leader_node_id:
prefer_replica = replica
break
prefer_replica_id = prefer_replica['replica_id']
self.spacetime("call", "spacetime-control", "prefer_leader", f"{prefer_replica_id}")

time.sleep(5)
next_leader_node_id = self.cluster.wait_for_leader_change(cur_leader_node_id)
self.cluster.ensure_leader_health()
replicas = self.cluster.get_all_replicas()
for replica in replicas:
self.assertNotEqual(replica['node_id'], cur_leader_node_id)
# def test_many_transactions(self):
# """This test sends many messages to the database and verifies that they are all present"""
# cur_leader = self.wait_for_leader_change(None)
# num_messages = 1000
# self.add(range(num_messages+1))
# message_table = self.sql(f"SELECT text FROM message where text='{num_messages}'")
# self.assertIn("1000", message_table)
# self.assertEqual(self.count_rows(), num_messages)
#
# retry_on_error(lambda: self.call("clean_up"))
#
#
self.assertEqual(prefer_replica['node_id'], next_leader_node_id)
self.assertEqual(self.count_rows(), 2)


def test_a_many_transactions(self):
"""This test sends many messages to the database and verifies that they are all present"""
cur_leader = self.cluster.wait_for_leader_change(None)
num_messages = 1000
self.add(range(num_messages))
message_table = self.sql(f"SELECT text FROM message where text='{num_messages -1}'")
self.assertIn("999", message_table)
self.assertEqual(self.count_rows(), num_messages)


retry(lambda: self.call("clean_up"))

# def test_quorum_loss(self):
# """This test makes cluster to lose majority of followers to verify if leader eventually stop accepting writes"""
#
Expand Down

0 comments on commit 1870162

Please sign in to comment.