From 7c39d87bc97747b6a9d0b4e3e268567d5f630066 Mon Sep 17 00:00:00 2001 From: Kim Altintop Date: Wed, 13 Nov 2024 14:35:03 +0100 Subject: [PATCH 1/8] smoketests: Use docker compose logs if compose file is specified --- smoketests/__main__.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/smoketests/__main__.py b/smoketests/__main__.py index 035b3e6b059..b82683aa962 100644 --- a/smoketests/__main__.py +++ b/smoketests/__main__.py @@ -57,6 +57,7 @@ def main(): parser = argparse.ArgumentParser() parser.add_argument("test", nargs="*", default=tests) parser.add_argument("--docker", action="store_true") + parser.add_argument("--compose-file") parser.add_argument("--skip-dotnet", action="store_true", help="ignore tests which require dotnet") parser.add_argument("--show-all-output", action="store_true", help="show all stdout/stderr from the tests as they're running") parser.add_argument("--parallel", action="store_true", help="run test classes in parallel") @@ -75,9 +76,12 @@ def main(): build_template_target() if args.docker: - docker_container = check_docker() # have docker logs print concurrently with the test output - subprocess.Popen(["docker", "logs", "-f", docker_container]) + if args.compose_file: + subprocess.Popen(["docker", "compose", "-f", args.compose_file, "logs", "-f"]) + else: + docker_container = check_docker() + subprocess.Popen(["docker", "logs", "-f", docker_container]) smoketests.HAVE_DOCKER = True smoketests.new_identity(TEST_DIR / 'config.toml') From 4250ef2d25e37323d5902a283a8be7ee55687878 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Mon, 20 Jan 2025 12:48:54 +0530 Subject: [PATCH 2/8] smoketest: use compose file for restart --- smoketests/__init__.py | 3 +++ smoketests/__main__.py | 3 ++- smoketests/tests/zz_docker.py | 7 ++++--- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/smoketests/__init__.py b/smoketests/__init__.py index 883ce063df1..be35ded6451 100644 --- a/smoketests/__init__.py +++ b/smoketests/__init__.py @@ -35,6 +35,9 @@ # and a dotnet installation is detected HAVE_DOTNET = False +# default value can be overriden by `--compose-file` flag +COMPOSE_FILE = "./docker-compose.yml" + # we need to late-bind the output stream to allow unittests to capture stdout/stderr. class CapturableHandler(logging.StreamHandler): diff --git a/smoketests/__main__.py b/smoketests/__main__.py index b82683aa962..5a3b97efd3d 100644 --- a/smoketests/__main__.py +++ b/smoketests/__main__.py @@ -15,7 +15,7 @@ def check_docker(): docker_ps = smoketests.run_cmd("docker", "ps", "--format=json") docker_ps = (json.loads(line) for line in docker_ps.splitlines()) for docker_container in docker_ps: - if "node" in docker_container["Image"]: + if "node" in docker_container["Image"] or "spacetime" in docker_container["Image"]: return docker_container["Names"] else: print("Docker container not found, is SpacetimeDB running?") @@ -79,6 +79,7 @@ def main(): # have docker logs print concurrently with the test output if args.compose_file: subprocess.Popen(["docker", "compose", "-f", args.compose_file, "logs", "-f"]) + smoketests.COMPOSE_FILE = args.compose_file else: docker_container = check_docker() subprocess.Popen(["docker", "logs", "-f", docker_container]) diff --git a/smoketests/tests/zz_docker.py b/smoketests/tests/zz_docker.py index 58ded478d0c..ac48077448f 100644 --- a/smoketests/tests/zz_docker.py +++ b/smoketests/tests/zz_docker.py @@ -1,21 +1,22 @@ import time -from .. import Smoketest, run_cmd, requires_docker +from .. import Smoketest, run_cmd, requires_docker, COMPOSE_FILE from urllib.request import urlopen from .add_remove_index import AddRemoveIndex def restart_docker(): + # Behold! # # You thought stop/start restarts? How wrong. Restart restarts. - run_cmd("docker", "compose", "restart") + run_cmd("docker", "compose", "-f", COMPOSE_FILE, "restart") # The suspense! # # Wait until compose believes the health probe succeeds. # # The container may decide to recompile, or grab a coffee at crates.io, or # whatever. In any case, restart doesn't mean the server is up yet. - run_cmd("docker", "compose", "up", "--no-recreate", "--detach", "--wait-timeout", "60") + run_cmd("docker", "compose","-f", COMPOSE_FILE, "up", "--no-recreate", "--detach", "--wait-timeout", "60") # Belts and suspenders! # # The health probe runs inside the container, but that doesn't mean we can From 34a9058cd0ab9b3832fa8824f783e33ee21ed0b7 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Tue, 21 Jan 2025 17:13:23 +0530 Subject: [PATCH 3/8] leader failure smoketest --- smoketests/__init__.py | 40 ++++++++++++++++++++++ smoketests/tests/replication.py | 60 +++++++++++++++++++++++++++++++++ 2 files changed, 100 insertions(+) create mode 100644 smoketests/tests/replication.py diff --git a/smoketests/__init__.py b/smoketests/__init__.py index a95c41edab4..bc3ad61e66b 100644 --- a/smoketests/__init__.py +++ b/smoketests/__init__.py @@ -164,6 +164,46 @@ def cargo_manifest(cls, manifest_text): def spacetime(cls, *args, **kwargs): return spacetime("--config-path", str(cls.config_path), *args, **kwargs) + def read_controldb(self, sql): + return self.spacetime("sql", "spacetime-control", sql) + + + def leader_node(self): + """ + returns `network_addr` field of node which hosts leader replica of database + """ + self._check_published() + def get_int(text): + return int(re.search(r'\d+', text).group()) + + sql = f"select id from database where database_identity=0x{self.database_identity}" + db_id_tb = self.read_controldb(sql) + database_id = get_int(db_id_tb); + + + sql = f"select leader from replication_state where database_id={database_id}" + leader_tb = self.read_controldb(sql) + leader_id = get_int(leader_tb) + + + sql = f"select node_id from replica where id={leader_id}" + leader_node_tb = self.read_controldb(sql) + leader_node_id = get_int(leader_node_tb) + + sql = f"select network_addr from node where id={leader_node_id}" + leader_host_tb = self.read_controldb(sql) + lines = leader_host_tb.splitlines() + print("lines", lines) + if len(lines) != 3: + return None + leader_row = lines[2] + # Check if the line contains the network address + if "(some =" in leader_row: + address = leader_row.split('"')[1] + hostname = address.split(':')[0] + return hostname + return None + def _check_published(self): if not hasattr(self, "database_identity"): raise Exception("Cannot use this function without publishing a module") diff --git a/smoketests/tests/replication.py b/smoketests/tests/replication.py new file mode 100644 index 00000000000..a53c1fa8e4f --- /dev/null +++ b/smoketests/tests/replication.py @@ -0,0 +1,60 @@ +import time +from .. import Smoketest, run_cmd, requires_docker +from .zz_docker import restart_docker + + +def kill_node_container(name_substr): + """ + Stop the first Docker container whose name contains the given substring. + + :param name_substr: Substring to match in container names + """ + container_list = run_cmd("docker", "ps", "--format", "{{.ID}} {{.Names}}") + + if container_list is None: + return + + for line in container_list.splitlines(): + container_id, container_name = line.split(maxsplit=1) + if name_substr in container_name: + result = run_cmd("docker", "stop", container_id) + if result is not None: + print(f"Container '{container_name}' has been killed.") + else: + print(f"Failed to kill container '{container_name}'.") + break + + +@requires_docker +class ReplicationTest(Smoketest): + MODULE_CODE = """ +use spacetimedb::{ReducerContext, Table}; + +#[spacetimedb::table(name = message, public)] +pub struct Message { + #[primary_key] + #[auto_inc] + id: u64, + text: String, +} + +#[spacetimedb::reducer] +fn send_message(ctx: &ReducerContext, text: String) { + ctx.db.message().insert(Message {id:0, text}); +} + +""" + def test_leader_failure(self): + """This test fails a leader, wait for new leader to be elected and verify if commits replicated to new leader""" + + self.call("send_message", "hey") + leader = self.leader_node(); + kill_node_container(leader) + + time.sleep(2) + + sub = self.subscribe("SELECT * FROM message", n=1) + self.call("send_message", "joey") + + self.assertEqual(sub(), [{'scheduled_table': {'deletes': [], 'inserts': [{"id":1,"text":"hey"}, {"id":2,"text":"joey"}]}}]) + From 4454eb7d402064d0e8b0622f250e9eecf915bb2c Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Wed, 5 Feb 2025 00:10:11 +0530 Subject: [PATCH 4/8] decide backlog test --- smoketests/tests/replication.py | 63 +++++++++++++++++++++------------ 1 file changed, 41 insertions(+), 22 deletions(-) diff --git a/smoketests/tests/replication.py b/smoketests/tests/replication.py index ccf207ac284..9cd8dbb9dc7 100644 --- a/smoketests/tests/replication.py +++ b/smoketests/tests/replication.py @@ -27,8 +27,8 @@ def retry_on_error(func, max_retries=3, retry_delay=2): print(f"Attempt {attempt} failed: {e}. Retrying in {retry_delay} seconds...") time.sleep(retry_delay) else: - print("Max retries reached. Raising the exception.") - raise + print("Max retries reached. Skipping the exception.") + return False @requires_docker class ReplicationTest(Smoketest): @@ -50,26 +50,26 @@ class ReplicationTest(Smoketest): """ - def test_leader_failure(self): - """This test fails a leader, wait for new leader to be elected and verify if commits replicated to new leader""" - - self.call("send_message", "hey") - leader = self.leader_node() - containers = list_container() - for container in containers: - if leader in container: - kill_node_container(container.split()[0]) - break - time.sleep(2) - - self.call("send_message", "joey") - - message_table = self.sql("SELECT * FROM message") - restart_docker() - time.sleep(2) - self.assertIn("hey", message_table) - self.assertIn("joey", message_table) - +# def test_leader_failure(self): +# """This test fails a leader, wait for new leader to be elected and verify if commits replicated to new leader""" +# +# self.call("send_message", "hey") +# leader = self.leader_node() +# containers = list_container() +# for container in containers: +# if leader in container: +# kill_node_container(container.split()[0]) +# break +# time.sleep(2) +# +# self.call("send_message", "joey") +# +# message_table = self.sql("SELECT * FROM message") +# restart_docker() +# time.sleep(2) +# self.assertIn("hey", message_table) +# self.assertIn("joey", message_table) +# def test_many_transactions(self): """This test sends many messages to the database and verifies that they are all present""" @@ -81,4 +81,23 @@ def test_many_transactions(self): self.assertIn("1000", message_table) + def test_quorum_loss(self): + """This test makes cluster to lose majority of followers to verify if leader eventually stop accepting writes""" + + for i in range(11): + retry_on_error(lambda: self.call("send_message", f"{i}")) + + leader = self.leader_node() + containers = list_container() + for container in containers: + if leader not in container and "worker" in container: + kill_node_container(container.split()[0]) + + time.sleep(2) + for i in range(1001): + if retry_on_error(lambda: self.call("send_message", f"{i}")) == False: + break + + self.assertTrue(i > 1 and i < 1000, f"leader should stop accpeting writes between 1 and 1000, got {i}") + From 10152a2a0f679956bfe8ef13329de99469e9a0da Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Wed, 5 Feb 2025 20:51:35 +0530 Subject: [PATCH 5/8] add network disconnect test --- smoketests/__init__.py | 38 ------ smoketests/tests/replication.py | 221 ++++++++++++++++++++++++-------- 2 files changed, 170 insertions(+), 89 deletions(-) diff --git a/smoketests/__init__.py b/smoketests/__init__.py index 7b519554480..5d4e7bbbb0b 100644 --- a/smoketests/__init__.py +++ b/smoketests/__init__.py @@ -168,44 +168,6 @@ def read_controldb(self, sql): return self.spacetime("sql", "spacetime-control", sql) - def leader_node(self): - """ - returns `network_addr` field of node which hosts leader replica of database - """ - self._check_published() - def get_int(text): - return int(re.search(r'\d+', text).group()) - - sql = f"select id from database where database_identity=0x{self.database_identity}" - db_id_tb = self.read_controldb(sql) - database_id = get_int(db_id_tb); - - - sql = f"select leader from replication_state where database_id={database_id}" - leader_tb = self.read_controldb(sql) - leader_id = get_int(leader_tb) - - - sql = f"select node_id from replica where id={leader_id}" - leader_node_tb = self.read_controldb(sql) - leader_node_id = get_int(leader_node_tb) - - sql = f"select network_addr from node where id={leader_node_id}" - leader_host_tb = self.read_controldb(sql) - lines = leader_host_tb.splitlines() - - # actual row starts from 3rd line - if len(lines) != 3: - return None - - leader_row = lines[2] - # Check if the line contains the network address - if "(some =" in leader_row: - address = leader_row.split('"')[1] - hostname = address.split(':')[0] - return hostname - return None - def _check_published(self): if not hasattr(self, "database_identity"): raise Exception("Cannot use this function without publishing a module") diff --git a/smoketests/tests/replication.py b/smoketests/tests/replication.py index 9cd8dbb9dc7..445bd3bf4ad 100644 --- a/smoketests/tests/replication.py +++ b/smoketests/tests/replication.py @@ -1,21 +1,26 @@ -import time +import time, random, re from .. import COMPOSE_FILE, Smoketest, run_cmd, requires_docker from .zz_docker import restart_docker - def list_container(): - container_list = run_cmd("docker", "compose", "-f", COMPOSE_FILE, "ps", "--format", "{{.ID}} {{.Name}}") + container_list = run_cmd("docker", "compose", "-f", COMPOSE_FILE, "ps", "-a", "--format", "{{.ID}} {{.Name}}") return container_list.splitlines() if container_list is not None else [] -def kill_node_container(container_id): - """ - Stop the first Docker container whose name contains the given substring. - - :param name_substr: Substring to match in container names - """ +def kill_container(container_id): run_cmd("docker", "kill", container_id) +def disconnect_container(container_id): + print(f"Disconnecting container {container_id}") + run_cmd("docker", "network", "disconnect", "private_spacetime_cloud", container_id) + print(f"Disconnected container {container_id}") + +def start_container(container_id): + run_cmd("docker", "start", container_id) +def connect_container(container_id): + print(f"Connecting container {container_id}") + run_cmd("docker", "network", "connect", "private_spacetime_cloud", container_id) + print(f"Connected container {container_id}") def retry_on_error(func, max_retries=3, retry_delay=2): """Helper to retry a function on error.""" @@ -30,10 +35,11 @@ def retry_on_error(func, max_retries=3, retry_delay=2): print("Max retries reached. Skipping the exception.") return False + @requires_docker class ReplicationTest(Smoketest): MODULE_CODE = """ -use spacetimedb::{ReducerContext, Table}; +use spacetimedb::{ReducerContext, Table, log}; #[spacetimedb::table(name = message, public)] pub struct Message { @@ -44,60 +50,173 @@ class ReplicationTest(Smoketest): } #[spacetimedb::reducer] -fn send_message(ctx: &ReducerContext, text: String) { +fn add(ctx: &ReducerContext, text: String) { + log::info!("adding message: {}", text); ctx.db.message().insert(Message {id:0, text}); } + +#[spacetimedb::reducer] +fn clean_up(ctx: &ReducerContext) { + log::info!("cleaning up messages"); + ctx.db.message().iter().for_each(|m| {ctx.db.message().delete(m); }); +} """ -# def test_leader_failure(self): -# """This test fails a leader, wait for new leader to be elected and verify if commits replicated to new leader""" -# -# self.call("send_message", "hey") -# leader = self.leader_node() -# containers = list_container() -# for container in containers: -# if leader in container: -# kill_node_container(container.split()[0]) -# break -# time.sleep(2) -# -# self.call("send_message", "joey") -# -# message_table = self.sql("SELECT * FROM message") -# restart_docker() -# time.sleep(2) -# self.assertIn("hey", message_table) -# self.assertIn("joey", message_table) -# + def ensure_working_cluster(self, wait=False): + """Ensure that the cluster is up and running.""" + if wait: + time.sleep(2) + + rnd = random.randint(9000, 10000) + retry_on_error(lambda: self.call("add", f"{rnd}")) + add_table = self.sql(f"SELECT id, text FROM message where text='{rnd}'") + self.assertIn(str(rnd), add_table) + + def add(self, r: range): + """Send a message to the database.""" + for i in r: + retry_on_error(lambda: self.call("add", f"{i}")) + + def count_rows(self): + message_tb_raw = self.sql("SELECT id FROM message") + # -2 to remove header + return len(message_tb_raw.splitlines()) - 2 + + + def leader_node(self): + """ + returns `network_addr` field of node which hosts leader replica of database + `network_addr` is use to pattern match with container name + """ + self._check_published() + def get_int(text): + return int(re.search(r'\d+', text).group()) - def test_many_transactions(self): - """This test sends many messages to the database and verifies that they are all present""" + sql = f"select id from database where database_identity=0x{self.database_identity}" + db_id_tb = self.read_controldb(sql) + database_id = get_int(db_id_tb); - num_messages = 1000 - for i in range(num_messages+1): - retry_on_error(lambda: self.call("send_message", f"{i}")) - message_table = self.sql(f"SELECT text FROM message where text='{num_messages}'") - self.assertIn("1000", message_table) + sql = f"select leader from replication_state where database_id={database_id}" + leader_tb = self.read_controldb(sql) + leader_id = get_int(leader_tb) - def test_quorum_loss(self): - """This test makes cluster to lose majority of followers to verify if leader eventually stop accepting writes""" - for i in range(11): - retry_on_error(lambda: self.call("send_message", f"{i}")) + sql = f"select node_id from replica where id={leader_id}" + leader_node_tb = self.read_controldb(sql) + leader_node_id = get_int(leader_node_tb) + + sql = f"select network_addr from node where id={leader_node_id}" + leader_host_tb = self.read_controldb(sql) + lines = leader_host_tb.splitlines() + # actual row starts from 3rd line + if len(lines) != 3: + return None + + leader_row = lines[2] + # Check if the line contains the network address + if "(some =" in leader_row: + address = leader_row.split('"')[1] + hostname = address.split(':')[0] + return hostname + return None + + + def get_leader_container_id(self): + """Kill current leader container and return its""" leader = self.leader_node() containers = list_container() for container in containers: - if leader not in container and "worker" in container: - kill_node_container(container.split()[0]) - - time.sleep(2) - for i in range(1001): - if retry_on_error(lambda: self.call("send_message", f"{i}")) == False: - break + if leader in container: + container_id = container.split()[0] + return container_id + return None + + def wait_for_leader_change(self, leader): + """Wait for leader to change""" + for i in range(10): + time.sleep(2) + next_leader = self.leader_node() + if next_leader != leader: + return next_leader + return None + + def test_leader_election_in_loop(self): + """This test fails a leader, wait for new leader to be elected and verify if commits replicated to new leader""" + + for i in range(5): + cur_leader = self.wait_for_leader_change(None) + self.ensure_working_cluster(True) + print("killing current leader: {}", cur_leader) + cur_leader_id = self.get_leader_container_id() + kill_container(cur_leader_id) + self.assertIsNotNone(cur_leader_id) + + next_leader = self.wait_for_leader_change(cur_leader) + self.assertNotEqual(cur_leader, next_leader) + # this check if leader election happened + self.ensure_working_cluster(True) + self.assertEqual(self.count_rows(), 2 * (i+1)) + # restart the old leader, so that we can maintain quorum for next iteration + start_container(cur_leader_id) + + time.sleep(5) + retry_on_error(lambda: self.call("clean_up")) + + + def test_leader_disconnect_in_loop(self): + """This test disconnects a leader, wait for new leader to be elected and verify if commits replicated to new leader""" + + for i in range(5): + cur_leader = self.wait_for_leader_change(None) + self.ensure_working_cluster(True) + cur_leader_id = self.get_leader_container_id() + disconnect_container(cur_leader_id) + self.assertIsNotNone(cur_leader_id) + + next_leader = self.wait_for_leader_change(cur_leader) + self.assertNotEqual(cur_leader, next_leader) + # this check if leader election happened + self.ensure_working_cluster(True) + self.assertEqual(self.count_rows(), 2 * (i+1)) - self.assertTrue(i > 1 and i < 1000, f"leader should stop accpeting writes between 1 and 1000, got {i}") - + # restart the old leader, so that we can maintain quorum for next iteration + connect_container(cur_leader_id) + + time.sleep(5) + retry_on_error(lambda: self.call("clean_up")) + +# def test_many_transactions(self): +# """This test sends many messages to the database and verifies that they are all present""" +# cur_leader = self.wait_for_leader_change(None) +# num_messages = 1000 +# self.add(range(num_messages+1)) +# message_table = self.sql(f"SELECT text FROM message where text='{num_messages}'") +# self.assertIn("1000", message_table) +# self.assertEqual(self.count_rows(), num_messages) +# +# retry_on_error(lambda: self.call("clean_up")) +# +# +# def test_quorum_loss(self): +# """This test makes cluster to lose majority of followers to verify if leader eventually stop accepting writes""" +# +# for i in range(11): +# retry_on_error(lambda: self.call("send_message", f"{i}")) +# +# leader = self.leader_node() +# containers = list_container() +# for container in containers: +# if leader not in container and "worker" in container: +# kill_node_container(container.split()[0]) +# +# time.sleep(2) +# for i in range(1001): +# if retry_on_error(lambda: self.call("send_message", f"{i}")) == False: +# break +# +# self.assertTrue(i > 1 and i < 1000, f"leader should stop accpeting writes between 1 and 1000, got {i}") +# From 17d082db094faef29c571fbd805079abec75fdf6 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Tue, 11 Feb 2025 22:08:28 +0530 Subject: [PATCH 6/8] root admin --- smoketests/__init__.py | 5 +- smoketests/config.toml | 2 +- smoketests/root_config.toml | 12 + smoketests/tests/replication.py | 425 ++++++++++++++++++++++---------- 4 files changed, 313 insertions(+), 131 deletions(-) create mode 100644 smoketests/root_config.toml diff --git a/smoketests/__init__.py b/smoketests/__init__.py index 5d4e7bbbb0b..d39da0540ac 100644 --- a/smoketests/__init__.py +++ b/smoketests/__init__.py @@ -164,10 +164,6 @@ def cargo_manifest(cls, manifest_text): def spacetime(cls, *args, **kwargs): return spacetime("--config-path", str(cls.config_path), *args, **kwargs) - def read_controldb(self, sql): - return self.spacetime("sql", "spacetime-control", sql) - - def _check_published(self): if not hasattr(self, "database_identity"): raise Exception("Cannot use this function without publishing a module") @@ -192,6 +188,7 @@ def log_records(self, n): return list(map(json.loads, logs.splitlines())) def publish_module(self, domain=None, *, clear=True, capture_stderr=True): + print("publishing module", self.publish_module) publish_output = self.spacetime( "publish", *[domain] if domain is not None else [], diff --git a/smoketests/config.toml b/smoketests/config.toml index cd91a9c8a31..c532e68b9b7 100644 --- a/smoketests/config.toml +++ b/smoketests/config.toml @@ -1,5 +1,5 @@ default_server = "127.0.0.1:3000" -spacetimedb_token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJFUzI1NiJ9.eyJoZXhfaWRlbnRpdHkiOiJjMjAwODIzOTA3M2M5MDgyNWQxZWY0MWVjNGJlYzg1MmNkNWIzOTdiMzBjZTVhZjUzNmZlZGExOTE3OWM5ZTJjIiwic3ViIjoiMzVmMjQ5ZTYtZjI0NC00ZDE1LWIzYmUtM2Q5NWZjMjA4MTFmIiwiaXNzIjoibG9jYWxob3N0IiwiYXVkIjpbInNwYWNldGltZWRiIl0sImlhdCI6MTczMjYzODI4NywiZXhwIjpudWxsfQ.oVIaYaH7w8ZiuowAflzKo4BrUeGk_1WqlaySMCYqIrkzB96SxVjCQuR0PYM8dOs7WhsiXvYH7dgVxbSbVV4PGg" +spacetimedb_token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJFUzI1NiJ9.eyJoZXhfaWRlbnRpdHkiOiJjMjAwNTE4ODlhODY1OGU2ZDA0ZGE2NzRjYTI3MTgzMjFlN2Y3YmMyNWYxYzA0MzZkZTRjNDE3NDFlYmY3MTEzIiwic3ViIjoiZjQ0MzgwOGUtNjdhOS00N2NjLTgyMmItM2ZhZDhjZmQxOTA5IiwiaXNzIjoibG9jYWxob3N0IiwiYXVkIjpbInNwYWNldGltZWRiIl0sImlhdCI6MTczOTUzMjg5NSwiZXhwIjpudWxsfQ.Lu4553s68Ibb_EGdbamJ3EPTYaztd6tXg0jiZM6E3dhNmKYLeFrBYK7vsc0az2q_rROEtx2ZWHna3WjqF88kuA" [[server_configs]] nickname = "localhost" diff --git a/smoketests/root_config.toml b/smoketests/root_config.toml new file mode 100644 index 00000000000..9874758f4e1 --- /dev/null +++ b/smoketests/root_config.toml @@ -0,0 +1,12 @@ +default_server = "local" +spacetimedb_token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJFUzI1NiJ9.eyJoZXhfaWRlbnRpdHkiOiJjMjAwZTA0NmQ0YWUxM2M4NzQwMGI3ZDYyMWQyNjA3ZjBiYzYyNTgwZWExNzY0MmU3ZTcwNzg2M2U3ZTk1NmY3Iiwic3ViIjoicGxhY2Vob2xkZXItbm9kZS1pZCIsImlzcyI6ImxvY2FsaG9zdCIsImF1ZCI6W10sImlhdCI6MTczOTQ3MDY0OCwiZXhwIjoxNzM5NDcxMDA4fQ.Xh5zq5MPbdqWUv3Opt_PnuGkg4aCer_gg0mLNDLo5KDCZfdmTE3E9EJvBFcQcwloRcmWr_As9Hjbx2EnPrzItQ" + +[[server_configs]] +nickname = "local" +host = "127.0.0.1:3000" +protocol = "http" + +[[server_configs]] +nickname = "testnet" +host = "testnet.spacetimedb.com" +protocol = "https" diff --git a/smoketests/tests/replication.py b/smoketests/tests/replication.py index 445bd3bf4ad..0693d4826ca 100644 --- a/smoketests/tests/replication.py +++ b/smoketests/tests/replication.py @@ -1,29 +1,106 @@ import time, random, re -from .. import COMPOSE_FILE, Smoketest, run_cmd, requires_docker +from .. import COMPOSE_FILE, STDB_DIR, TEST_DIR, Smoketest, run_cmd, requires_docker, spacetime from .zz_docker import restart_docker -def list_container(): - container_list = run_cmd("docker", "compose", "-f", COMPOSE_FILE, "ps", "-a", "--format", "{{.ID}} {{.Name}}") - return container_list.splitlines() if container_list is not None else [] +import os +from dataclasses import dataclass +from typing import List, Optional, Callable +import subprocess +import time +from functools import wraps -def kill_container(container_id): - run_cmd("docker", "kill", container_id) +@dataclass +class DockerContainer: + """Represents a Docker container with its basic properties.""" + id: str + name: str -def disconnect_container(container_id): - print(f"Disconnecting container {container_id}") - run_cmd("docker", "network", "disconnect", "private_spacetime_cloud", container_id) - print(f"Disconnected container {container_id}") +class DockerManager: + """Manages all Docker and Docker Compose operations.""" + + def __init__(self, compose_file: str, **config): + self.compose_file = compose_file + self.network_name = config.get('network_name') or \ + os.getenv('DOCKER_NETWORK_NAME', 'private_spacetime_cloud') + self.control_db_container = config.get('control_db_container') or \ + os.getenv('CONTROL_DB_CONTAINER', 'node') + self.spacetime_cli_bin = config.get('spacetime_cli_bin') or \ + os.getenv('SPACETIME_CLI_BIN', 'spacetimedb-cloud') + def _execute_command(self, *args: str) -> str: + """Execute a Docker command and return its output.""" + try: + result = subprocess.run( + args, + capture_output=True, + text=True, + check=True + ) + return result.stdout.strip() + except subprocess.CalledProcessError as e: + print(f"Command failed: {e.stderr}") + raise + except Exception as e: + print(f"Unexpected error: {str(e)}") + raise + + def compose(self, *args: str) -> str: + """Execute a docker-compose command.""" + return self._execute_command("docker", "compose", "-f", self.compose_file, *args) + + def list_containers(self) -> List[DockerContainer]: + """List all containers and return as DockerContainer objects.""" + output = self.compose("ps", "-a", "--format", "{{.ID}} {{.Name}}") + containers = [] + for line in output.splitlines(): + if line.strip(): + container_id, name = line.split(maxsplit=1) + containers.append(DockerContainer(id=container_id, name=name)) + return containers + + def get_container_by_name(self, name: str) -> Optional[DockerContainer]: + """Find a container by name pattern.""" + return next( + (c for c in self.list_containers() if name in c.name), + None + ) -def start_container(container_id): - run_cmd("docker", "start", container_id) + def kill_container(self, container_id: str): + """Kill a container by ID.""" + print(f"Killing container {container_id}") + self._execute_command("docker", "kill", container_id) -def connect_container(container_id): - print(f"Connecting container {container_id}") - run_cmd("docker", "network", "connect", "private_spacetime_cloud", container_id) - print(f"Connected container {container_id}") + def start_container(self, container_id: str): + """Start a container by ID.""" + print(f"Starting container {container_id}") + self._execute_command("docker", "start", container_id) -def retry_on_error(func, max_retries=3, retry_delay=2): - """Helper to retry a function on error.""" + def disconnect_container(self, container_id: str): + """Disconnect a container from the network.""" + print(f"Disconnecting container {container_id}") + self._execute_command( + "docker", "network", "disconnect", + self.network_name, container_id + ) + print(f"Disconnected container {container_id}") + + def connect_container(self, container_id: str): + """Connect a container to the network.""" + print(f"Connecting container {container_id}") + self._execute_command( + "docker", "network", "connect", + self.network_name, container_id + ) + print(f"Connected container {container_id}") + + def generate_root_token(self) -> str: + """Generate a root token using spacetimedb-cloud.""" + return self.compose( + "exec", self.control_db_container, self.spacetime_cli_bin, "token", "gen", + "--subject=placeholder-node-id", + "--jwt-priv-key", "/etc/spacetimedb/keys/id_ecdsa").split('|')[1] + +def retry(func: Callable, max_retries: int = 3, retry_delay: int = 2): + """Retry a function on failure with delay.""" for attempt in range(1, max_retries + 1): try: return func() @@ -35,6 +112,128 @@ def retry_on_error(func, max_retries=3, retry_delay=2): print("Max retries reached. Skipping the exception.") return False +def get_int(text): + return int(re.search(r'\d+', text).group()) + +class Cluster: + """Manages leader-related operations and state for SpaceTime database cluster.""" + + def __init__(self, docker_manager, smoketest: Smoketest): + self.docker = docker_manager + self.test = smoketest + + def read_controldb(self, sql): + """Helper method to read from control database.""" + return self.test.spacetime("sql", "spacetime-control", sql) + + + + def get_db_id(self): + # Query database ID + sql = f"select id from database where database_identity=0x{self.test.database_identity}" + db_id_tb = self.read_controldb(sql) + return get_int(db_id_tb) + + + def get_all_replicas(self): + """Get all replica nodes in the cluster.""" + database_id = self.get_db_id() + sql = f"select id, node_id from replica where database_id={database_id}" + replica_tb = self.read_controldb(sql) + replicas = [] + for line in replica_tb.splitlines()[2:]: + replica_id, node_id = line.split('|') + replicas.append({ + 'replica_id': int(replica_id), + 'node_id': int(node_id) + }) + return replicas + + def get_leader_info(self): + """Get current leader's node information including ID, hostname, and container ID.""" + + database_id = self.get_db_id() + # Query leader replica ID + sql = f"select leader from replication_state where database_id={database_id}" + leader_tb = self.read_controldb(sql) + leader_id = get_int(leader_tb) + + # Query leader node ID + sql = f"select node_id from replica where id={leader_id}" + leader_node_tb = self.read_controldb(sql) + leader_node_id = get_int(leader_node_tb) + + # Query leader hostname + sql = f"select network_addr from node where id={leader_node_id}" + leader_host_tb = str(self.read_controldb(sql)) + lines = leader_host_tb.splitlines() + + hostname = "" + if len(lines) == 3: # actual row starts from 3rd line + leader_row = lines[2] + if "(some =" in leader_row: + address = leader_row.split('"')[1] + hostname = address.split(':')[0] + + # Find container ID + container_id = "" + containers = self.docker.list_containers() + for container in containers: + if hostname in container.name: + container_id = container.id + break + + return { + 'node_id': leader_node_id, + 'hostname': hostname, + 'container_id': container_id + } + + def wait_for_leader_change(self, previous_leader_node, max_attempts=10, delay=2): + """Wait for leader to change and return new leader node_id.""" + for _ in range(max_attempts): + current_leader = self.get_leader_info()['node_id'] + if current_leader != previous_leader_node: + return current_leader + time.sleep(delay) + return None + + def ensure_leader_health(self, wait_time=2): + """Verify leader is healthy by testing basic operations.""" + if wait_time: + time.sleep(wait_time) + + rnd = random.randint(9000, 10000) + retry(lambda: self.test.call("add", f"{rnd}")) + add_table = str(self.test.sql(f"SELECT id, text FROM message where text='{rnd}'")) + return str(rnd) in add_table + + def fail_leader(self, action='kill'): + """Force leader failure through either killing or network disconnect.""" + leader_info = self.get_leader_info() + container_id = leader_info['container_id'] + hostname = leader_info['hostname'] + + if not container_id: + raise ValueError("Could not find leader container") + + if action == 'kill': + self.docker.kill_container(container_id) + elif action == 'disconnect': + self.docker.disconnect_container(container_id) + else: + raise ValueError(f"Unknown action: {action}") + + return container_id + + def restore_leader(self, container_id, action='start'): + """Restore failed leader through either starting or network reconnect.""" + if action == 'start': + self.docker.start_container(container_id) + elif action == 'connect': + self.docker.connect_container(container_id) + else: + raise ValueError(f"Unknown action: {action}") @requires_docker class ReplicationTest(Smoketest): @@ -63,130 +262,104 @@ class ReplicationTest(Smoketest): } """ - def ensure_working_cluster(self, wait=False): - """Ensure that the cluster is up and running.""" - if wait: - time.sleep(2) + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.root_config = cls.project_path / "root_config" - rnd = random.randint(9000, 10000) - retry_on_error(lambda: self.call("add", f"{rnd}")) - add_table = self.sql(f"SELECT id, text FROM message where text='{rnd}'") - self.assertIn(str(rnd), add_table) + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self.docker = DockerManager(COMPOSE_FILE) + self.root_token = self.docker.generate_root_token() + + self.cluster = Cluster(self.docker, self) + + def add_me_as_admin(self): + """Add the current user as an admin account""" + db_owner_id = self.spacetime("login", "show").split()[-1] + spacetime("--config-path", self.root_config, "login", "--token", self.root_token) + spacetime("--config-path", self.root_config, "call", "spacetime-control", "create_admin_account", f"0x{db_owner_id}") def add(self, r: range): """Send a message to the database.""" for i in r: - retry_on_error(lambda: self.call("add", f"{i}")) + retry(lambda: self.call("add", f"{i}")) def count_rows(self): - message_tb_raw = self.sql("SELECT id FROM message") + message_tb_raw = str(self.sql("SELECT id FROM message")) # -2 to remove header return len(message_tb_raw.splitlines()) - 2 - def leader_node(self): - """ - returns `network_addr` field of node which hosts leader replica of database - `network_addr` is use to pattern match with container name - """ - self._check_published() - def get_int(text): - return int(re.search(r'\d+', text).group()) - - sql = f"select id from database where database_identity=0x{self.database_identity}" - db_id_tb = self.read_controldb(sql) - database_id = get_int(db_id_tb); - - - sql = f"select leader from replication_state where database_id={database_id}" - leader_tb = self.read_controldb(sql) - leader_id = get_int(leader_tb) - - - sql = f"select node_id from replica where id={leader_id}" - leader_node_tb = self.read_controldb(sql) - leader_node_id = get_int(leader_node_tb) - - sql = f"select network_addr from node where id={leader_node_id}" - leader_host_tb = self.read_controldb(sql) - lines = leader_host_tb.splitlines() - - # actual row starts from 3rd line - if len(lines) != 3: - return None - - leader_row = lines[2] - # Check if the line contains the network address - if "(some =" in leader_row: - address = leader_row.split('"')[1] - hostname = address.split(':')[0] - return hostname - return None +# def test_leader_election_in_loop(self): +# """This test fails a leader, wait for new leader to be elected and verify if commits replicated to new leader""" +# +# for i in range(5): +# cur_leader = self.cluster.wait_for_leader_change(None) +# self.cluster.ensure_leader_health() +# +# print("killing current leader: {}", cur_leader) +# container_id = self.cluster.fail_leader() +# +# +# self.assertIsNotNone(container_id) +# +# next_leader = self.cluster.wait_for_leader_change(cur_leader) +# self.assertNotEqual(cur_leader, next_leader) +# # this check if leader election happened +# self.cluster.ensure_leader_health() +# self.assertEqual(self.count_rows(), 2 * (i+1)) +# # restart the old leader, so that we can maintain quorum for next iteration +# self.cluster.restore_leader(container_id, 'start') +# +# time.sleep(5) +# retry(lambda: self.call("clean_up")) +# +# +# def test_leader_disconnect_in_loop(self): +# """This test disconnects a leader, wait for new leader to be elected and verify if commits replicated to new leader""" +# +# for i in range(5): +# cur_leader = self.cluster.wait_for_leader_change(None) +# self.cluster.ensure_leader_health() +# +# print("killing current leader: {}", cur_leader) +# container_id = self.cluster.fail_leader('disconnect') +# +# self.assertIsNotNone(container_id) +# +# next_leader = self.cluster.wait_for_leader_change(cur_leader) +# self.assertNotEqual(cur_leader, next_leader) +# # this check if leader election happened +# self.cluster.ensure_leader_health() +# self.assertEqual(self.count_rows(), 2 * (i+1)) +# +# # restart the old leader, so that we can maintain quorum for next iteration +# self.cluster.restore_leader(container_id, 'connect') +# +# time.sleep(5) +# retry(lambda: self.call("clean_up")) +# - def get_leader_container_id(self): - """Kill current leader container and return its""" - leader = self.leader_node() - containers = list_container() - for container in containers: - if leader in container: - container_id = container.split()[0] - return container_id - return None + def test_drain_leader_node(self): + """This test moves leader replica to different node""" + self.add_me_as_admin() + cur_leader_node_id = self.cluster.wait_for_leader_change(None) + self.cluster.ensure_leader_health() - def wait_for_leader_change(self, leader): - """Wait for leader to change""" - for i in range(10): - time.sleep(2) - next_leader = self.leader_node() - if next_leader != leader: - return next_leader - return None + replicas = self.cluster.get_all_replicas() + empty_node_id = 14 + for replica in replicas: + empty_node_id = empty_node_id - replica['node_id'] + self.spacetime("call", "spacetime-control", "drain_node", f"{cur_leader_node_id}", f"{empty_node_id}") - def test_leader_election_in_loop(self): - """This test fails a leader, wait for new leader to be elected and verify if commits replicated to new leader""" - - for i in range(5): - cur_leader = self.wait_for_leader_change(None) - self.ensure_working_cluster(True) - print("killing current leader: {}", cur_leader) - cur_leader_id = self.get_leader_container_id() - kill_container(cur_leader_id) - self.assertIsNotNone(cur_leader_id) - - next_leader = self.wait_for_leader_change(cur_leader) - self.assertNotEqual(cur_leader, next_leader) - # this check if leader election happened - self.ensure_working_cluster(True) - self.assertEqual(self.count_rows(), 2 * (i+1)) - # restart the old leader, so that we can maintain quorum for next iteration - start_container(cur_leader_id) - - time.sleep(5) - retry_on_error(lambda: self.call("clean_up")) - - - def test_leader_disconnect_in_loop(self): - """This test disconnects a leader, wait for new leader to be elected and verify if commits replicated to new leader""" - - for i in range(5): - cur_leader = self.wait_for_leader_change(None) - self.ensure_working_cluster(True) - cur_leader_id = self.get_leader_container_id() - disconnect_container(cur_leader_id) - self.assertIsNotNone(cur_leader_id) - - next_leader = self.wait_for_leader_change(cur_leader) - self.assertNotEqual(cur_leader, next_leader) - # this check if leader election happened - self.ensure_working_cluster(True) - self.assertEqual(self.count_rows(), 2 * (i+1)) - - # restart the old leader, so that we can maintain quorum for next iteration - connect_container(cur_leader_id) - time.sleep(5) - retry_on_error(lambda: self.call("clean_up")) + self.cluster.ensure_leader_health() + replicas = self.cluster.get_all_replicas() + for replica in replicas: + self.assertNotEqual(replica['node_id'], cur_leader_node_id) # def test_many_transactions(self): # """This test sends many messages to the database and verifies that they are all present""" From 36a27266105a5c3319a2d23744976a73c0695906 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Fri, 14 Feb 2025 22:56:17 +0530 Subject: [PATCH 7/8] leader prefer test --- smoketests/root_config.toml | 12 --- smoketests/tests/replication.py | 155 ++++++++++++++++++-------------- 2 files changed, 88 insertions(+), 79 deletions(-) delete mode 100644 smoketests/root_config.toml diff --git a/smoketests/root_config.toml b/smoketests/root_config.toml deleted file mode 100644 index 9874758f4e1..00000000000 --- a/smoketests/root_config.toml +++ /dev/null @@ -1,12 +0,0 @@ -default_server = "local" -spacetimedb_token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJFUzI1NiJ9.eyJoZXhfaWRlbnRpdHkiOiJjMjAwZTA0NmQ0YWUxM2M4NzQwMGI3ZDYyMWQyNjA3ZjBiYzYyNTgwZWExNzY0MmU3ZTcwNzg2M2U3ZTk1NmY3Iiwic3ViIjoicGxhY2Vob2xkZXItbm9kZS1pZCIsImlzcyI6ImxvY2FsaG9zdCIsImF1ZCI6W10sImlhdCI6MTczOTQ3MDY0OCwiZXhwIjoxNzM5NDcxMDA4fQ.Xh5zq5MPbdqWUv3Opt_PnuGkg4aCer_gg0mLNDLo5KDCZfdmTE3E9EJvBFcQcwloRcmWr_As9Hjbx2EnPrzItQ" - -[[server_configs]] -nickname = "local" -host = "127.0.0.1:3000" -protocol = "http" - -[[server_configs]] -nickname = "testnet" -host = "testnet.spacetimedb.com" -protocol = "https" diff --git a/smoketests/tests/replication.py b/smoketests/tests/replication.py index 0693d4826ca..1216990acce 100644 --- a/smoketests/tests/replication.py +++ b/smoketests/tests/replication.py @@ -292,87 +292,108 @@ def count_rows(self): return len(message_tb_raw.splitlines()) - 2 -# def test_leader_election_in_loop(self): -# """This test fails a leader, wait for new leader to be elected and verify if commits replicated to new leader""" -# -# for i in range(5): -# cur_leader = self.cluster.wait_for_leader_change(None) -# self.cluster.ensure_leader_health() -# -# print("killing current leader: {}", cur_leader) -# container_id = self.cluster.fail_leader() -# -# -# self.assertIsNotNone(container_id) -# -# next_leader = self.cluster.wait_for_leader_change(cur_leader) -# self.assertNotEqual(cur_leader, next_leader) -# # this check if leader election happened -# self.cluster.ensure_leader_health() -# self.assertEqual(self.count_rows(), 2 * (i+1)) -# # restart the old leader, so that we can maintain quorum for next iteration -# self.cluster.restore_leader(container_id, 'start') -# -# time.sleep(5) -# retry(lambda: self.call("clean_up")) -# -# -# def test_leader_disconnect_in_loop(self): -# """This test disconnects a leader, wait for new leader to be elected and verify if commits replicated to new leader""" -# -# for i in range(5): -# cur_leader = self.cluster.wait_for_leader_change(None) -# self.cluster.ensure_leader_health() + def test_leader_b_election_in_loop(self): + """This test fails a leader, wait for new leader to be elected and verify if commits replicated to new leader""" + + for i in range(5): + cur_leader = self.cluster.wait_for_leader_change(None) + self.cluster.ensure_leader_health() + + print("killing current leader: {}", cur_leader) + container_id = self.cluster.fail_leader() + + + self.assertIsNotNone(container_id) + + next_leader = self.cluster.wait_for_leader_change(cur_leader) + self.assertNotEqual(cur_leader, next_leader) + # this check if leader election happened + self.cluster.ensure_leader_health() + self.assertEqual(self.count_rows(), 2 * (i+1)) + # restart the old leader, so that we can maintain quorum for next iteration + self.cluster.restore_leader(container_id, 'start') + + time.sleep(5) + retry(lambda: self.call("clean_up")) + + def test_leader_c_disconnect_in_loop(self): + """This test disconnects a leader, wait for new leader to be elected and verify if commits replicated to new leader""" + + for i in range(5): + cur_leader = self.cluster.wait_for_leader_change(None) + self.cluster.ensure_leader_health() + + container_id = self.cluster.fail_leader('disconnect') + + self.assertIsNotNone(container_id) + + next_leader = self.cluster.wait_for_leader_change(cur_leader) + self.assertNotEqual(cur_leader, next_leader) + # this check if leader election happened + self.cluster.ensure_leader_health() + self.assertEqual(self.count_rows(), 2 * (i+1)) + + # restart the old leader, so that we can maintain quorum for next iteration + self.cluster.restore_leader(container_id, 'connect') + time.sleep(1) + + time.sleep(5) + retry(lambda: self.call("clean_up")) + + + +# def test_drain_leader_node(self): +# """This test moves leader replica to different node""" +# self.add_me_as_admin() +# cur_leader_node_id = self.cluster.wait_for_leader_change(None) +# self.cluster.ensure_leader_health() # -# print("killing current leader: {}", cur_leader) -# container_id = self.cluster.fail_leader('disconnect') -# -# self.assertIsNotNone(container_id) +# replicas = self.cluster.get_all_replicas() +# empty_node_id = 14 +# for replica in replicas: +# empty_node_id = empty_node_id - replica['node_id'] +# self.spacetime("call", "spacetime-control", "drain_node", f"{cur_leader_node_id}", f"{empty_node_id}") # -# next_leader = self.cluster.wait_for_leader_change(cur_leader) -# self.assertNotEqual(cur_leader, next_leader) -# # this check if leader election happened -# self.cluster.ensure_leader_health() -# self.assertEqual(self.count_rows(), 2 * (i+1)) -# -# # restart the old leader, so that we can maintain quorum for next iteration -# self.cluster.restore_leader(container_id, 'connect') -# # time.sleep(5) -# retry(lambda: self.call("clean_up")) +# self.cluster.ensure_leader_health() +# replicas = self.cluster.get_all_replicas() +# for replica in replicas: +# self.assertNotEqual(replica['node_id'], cur_leader_node_id) # - - def test_drain_leader_node(self): + def test_prefer_leader(self): """This test moves leader replica to different node""" self.add_me_as_admin() cur_leader_node_id = self.cluster.wait_for_leader_change(None) self.cluster.ensure_leader_health() replicas = self.cluster.get_all_replicas() - empty_node_id = 14 + prefer_replica = {} for replica in replicas: - empty_node_id = empty_node_id - replica['node_id'] - self.spacetime("call", "spacetime-control", "drain_node", f"{cur_leader_node_id}", f"{empty_node_id}") + if replica['node_id'] != cur_leader_node_id: + prefer_replica = replica + break + prefer_replica_id = prefer_replica['replica_id'] + self.spacetime("call", "spacetime-control", "prefer_leader", f"{prefer_replica_id}") - time.sleep(5) + next_leader_node_id = self.cluster.wait_for_leader_change(cur_leader_node_id) self.cluster.ensure_leader_health() - replicas = self.cluster.get_all_replicas() - for replica in replicas: - self.assertNotEqual(replica['node_id'], cur_leader_node_id) - -# def test_many_transactions(self): -# """This test sends many messages to the database and verifies that they are all present""" -# cur_leader = self.wait_for_leader_change(None) -# num_messages = 1000 -# self.add(range(num_messages+1)) -# message_table = self.sql(f"SELECT text FROM message where text='{num_messages}'") -# self.assertIn("1000", message_table) -# self.assertEqual(self.count_rows(), num_messages) -# -# retry_on_error(lambda: self.call("clean_up")) -# -# + self.assertEqual(prefer_replica['node_id'], next_leader_node_id) + self.assertEqual(self.count_rows(), 2) + + + def test_a_many_transactions(self): + """This test sends many messages to the database and verifies that they are all present""" + cur_leader = self.cluster.wait_for_leader_change(None) + num_messages = 1000 + self.add(range(num_messages)) + message_table = self.sql(f"SELECT text FROM message where text='{num_messages -1}'") + self.assertIn("999", message_table) + self.assertEqual(self.count_rows(), num_messages) + + + retry(lambda: self.call("clean_up")) + # def test_quorum_loss(self): # """This test makes cluster to lose majority of followers to verify if leader eventually stop accepting writes""" # From f90eb53f78eaea9056f07e85e55f49590473cbe3 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Wed, 5 Mar 2025 16:06:52 +0530 Subject: [PATCH 8/8] faster tx module --- smoketests/config.toml | 3 +- smoketests/tests/replication.py | 135 ++++++++++++++++++++------------ 2 files changed, 84 insertions(+), 54 deletions(-) diff --git a/smoketests/config.toml b/smoketests/config.toml index 258c55e031c..866c5e585a9 100644 --- a/smoketests/config.toml +++ b/smoketests/config.toml @@ -1,6 +1,5 @@ default_server = "127.0.0.1:3000" - -spacetimedb_token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJFUzI1NiJ9.eyJoZXhfaWRlbnRpdHkiOiJjMjAwMTk4MDViYzU2ZDczMmUwN2ZjZjlkYWJjYTJkY2I3ZGIwMzMwYjQyOGJjMTI0Y2E0YmMzNWQxNTgzZTkxIiwic3ViIjoiMDMwYmM2NjAtMDdlZC00ZDg0LThkMTYtMDRmMTUzMTgzZjZhIiwiaXNzIjoibG9jYWxob3N0IiwiYXVkIjpbInNwYWNldGltZWRiIl0sImlhdCI6MTczODk1MjY3OCwiZXhwIjpudWxsfQ.TInoGjszYrkVw6vqk_vfj_zgirJcf2-8aNQZHr8e0dJRYK9sCMh2RAjJ_odTbgXRmEIaW416vOZjFGVzuUH6Sg" +spacetimedb_token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJFUzI1NiJ9.eyJoZXhfaWRlbnRpdHkiOiJjMjAwZTJlMWRlNTY0YmU2YzAyMDU4YzQwYzg0ZDdhNjg4OWE3MzA4ZTRjYjA5OGU1NTRjZDhlZWE1MDNhN2E3Iiwic3ViIjoiMGFmOTM0OTUtZGY1Yy00Y2ViLTllNjktNmVhZDVlMDVmMGUzIiwiaXNzIjoibG9jYWxob3N0IiwiYXVkIjpbInNwYWNldGltZWRiIl0sImlhdCI6MTc0MDczODI4OSwiZXhwIjpudWxsfQ.yF-glntsyd5xusUDFFtSWkPtONB70H4lLeDrd84m6ZCJ67W_1RegIqIpCDMaAJFmY5euDAO7UWUFiicJ86vSrA" [[server_configs]] nickname = "localhost" diff --git a/smoketests/tests/replication.py b/smoketests/tests/replication.py index 1216990acce..976234f3d3d 100644 --- a/smoketests/tests/replication.py +++ b/smoketests/tests/replication.py @@ -198,15 +198,16 @@ def wait_for_leader_change(self, previous_leader_node, max_attempts=10, delay=2) time.sleep(delay) return None - def ensure_leader_health(self, wait_time=2): - """Verify leader is healthy by testing basic operations.""" + def ensure_leader_health(self, id, wait_time=2): + """Verify leader is healthy by inserting a row.""" if wait_time: time.sleep(wait_time) - rnd = random.randint(9000, 10000) - retry(lambda: self.test.call("add", f"{rnd}")) - add_table = str(self.test.sql(f"SELECT id, text FROM message where text='{rnd}'")) - return str(rnd) in add_table + retry(lambda: self.test.call("start", id, 1)) + add_table = str(self.test.sql(f"SELECT id FROM counter where id={id}")) + if str(id) not in add_table: + raise ValueError(f"Could not find {rnd} in counter table") + def fail_leader(self, action='kill'): """Force leader failure through either killing or network disconnect.""" @@ -238,27 +239,55 @@ def restore_leader(self, container_id, action='start'): @requires_docker class ReplicationTest(Smoketest): MODULE_CODE = """ -use spacetimedb::{ReducerContext, Table, log}; +use spacetimedb::{duration, ReducerContext, Table}; -#[spacetimedb::table(name = message, public)] -pub struct Message { +#[spacetimedb::table(name = counter, public)] +pub struct Counter { #[primary_key] #[auto_inc] id: u64, - text: String, + #[index(btree)] + value: u64, } -#[spacetimedb::reducer] -fn add(ctx: &ReducerContext, text: String) { - log::info!("adding message: {}", text); - ctx.db.message().insert(Message {id:0, text}); +#[spacetimedb::table(name = schedule_counter, public, scheduled(increment, at = sched_at))] +pub struct ScheduledCounter { + #[primary_key] + #[auto_inc] + scheduled_id: u64, + sched_at: spacetimedb::ScheduleAt, + count: u64, } +#[spacetimedb::reducer] +fn increment(ctx: &ReducerContext, arg: ScheduledCounter) { + // if the counter exists, increment it + if let Some(counter) = ctx.db.counter().id().find(arg.scheduled_id) { + if counter.value == arg.count { + ctx.db.schedule_counter().delete(arg); + return; + } + // update counter + ctx.db.counter().id().update(Counter { + id: arg.scheduled_id, + value: counter.value + 1, + }); + } else { + // insert fresh counter + ctx.db.counter().insert(Counter { + id: arg.scheduled_id, + value: 1, + }); + } +} #[spacetimedb::reducer] -fn clean_up(ctx: &ReducerContext) { - log::info!("cleaning up messages"); - ctx.db.message().iter().for_each(|m| {ctx.db.message().delete(m); }); +fn start(ctx: &ReducerContext, id: u64, count: u64) { + ctx.db.schedule_counter().insert(ScheduledCounter { + scheduled_id: id, + sched_at: duration!(0ms).into(), + count, + }); } """ @@ -281,47 +310,44 @@ def add_me_as_admin(self): spacetime("--config-path", self.root_config, "login", "--token", self.root_token) spacetime("--config-path", self.root_config, "call", "spacetime-control", "create_admin_account", f"0x{db_owner_id}") - def add(self, r: range): + def start(self, id: int, count: int): """Send a message to the database.""" - for i in r: - retry(lambda: self.call("add", f"{i}")) - - def count_rows(self): - message_tb_raw = str(self.sql("SELECT id FROM message")) - # -2 to remove header - return len(message_tb_raw.splitlines()) - 2 + retry(lambda: self.call("start", id, count)) - - def test_leader_b_election_in_loop(self): + def test_leader_election_in_loop(self): """This test fails a leader, wait for new leader to be elected and verify if commits replicated to new leader""" - - for i in range(5): + iterations = 5; + row_ids = [101 + i for i in range(iterations * 2)] + for (first_id, second_id) in zip(row_ids[::2], row_ids[1::2]): cur_leader = self.cluster.wait_for_leader_change(None) - self.cluster.ensure_leader_health() + self.cluster.ensure_leader_health(first_id) print("killing current leader: {}", cur_leader) container_id = self.cluster.fail_leader() - self.assertIsNotNone(container_id) next_leader = self.cluster.wait_for_leader_change(cur_leader) self.assertNotEqual(cur_leader, next_leader) # this check if leader election happened - self.cluster.ensure_leader_health() - self.assertEqual(self.count_rows(), 2 * (i+1)) + self.cluster.ensure_leader_health(second_id) # restart the old leader, so that we can maintain quorum for next iteration self.cluster.restore_leader(container_id, 'start') - time.sleep(5) - retry(lambda: self.call("clean_up")) + # verify if all past rows are present in new leader + for row_id in row_ids: + table = self.spacetime("sql", self.database_identity, f"SELECT * FROM counter WHERE id = {row_id}") + self.assertIn(f"{row_id}", table) def test_leader_c_disconnect_in_loop(self): """This test disconnects a leader, wait for new leader to be elected and verify if commits replicated to new leader""" - for i in range(5): + iterations = 5; + row_ids = [201 + i for i in range(iterations * 2)] + + for (first_id, second_id) in zip(row_ids[::2], row_ids[1::2]): cur_leader = self.cluster.wait_for_leader_change(None) - self.cluster.ensure_leader_health() + self.cluster.ensure_leader_health(first_id) container_id = self.cluster.fail_leader('disconnect') @@ -330,16 +356,16 @@ def test_leader_c_disconnect_in_loop(self): next_leader = self.cluster.wait_for_leader_change(cur_leader) self.assertNotEqual(cur_leader, next_leader) # this check if leader election happened - self.cluster.ensure_leader_health() - self.assertEqual(self.count_rows(), 2 * (i+1)) + self.cluster.ensure_leader_health(second_id) # restart the old leader, so that we can maintain quorum for next iteration self.cluster.restore_leader(container_id, 'connect') time.sleep(1) - - time.sleep(5) - retry(lambda: self.call("clean_up")) + # verify if all past rows are present in new leader + for row_id in row_ids: + table = self.spacetime("sql", self.database_identity, f"SELECT * FROM counter WHERE id = {row_id}") + self.assertIn(f"{row_id}", table) # def test_drain_leader_node(self): @@ -365,7 +391,7 @@ def test_prefer_leader(self): """This test moves leader replica to different node""" self.add_me_as_admin() cur_leader_node_id = self.cluster.wait_for_leader_change(None) - self.cluster.ensure_leader_health() + self.cluster.ensure_leader_health(301) replicas = self.cluster.get_all_replicas() prefer_replica = {} @@ -377,22 +403,27 @@ def test_prefer_leader(self): self.spacetime("call", "spacetime-control", "prefer_leader", f"{prefer_replica_id}") next_leader_node_id = self.cluster.wait_for_leader_change(cur_leader_node_id) - self.cluster.ensure_leader_health() + self.cluster.ensure_leader_health(302) self.assertEqual(prefer_replica['node_id'], next_leader_node_id) - self.assertEqual(self.count_rows(), 2) + + + # verify if all past rows are present in new leader + for row_id in [301, 302]: + table = self.spacetime("sql", self.database_identity, f"SELECT * FROM counter WHERE id = {row_id}") + self.assertIn(f"{row_id}", table) def test_a_many_transactions(self): """This test sends many messages to the database and verifies that they are all present""" - cur_leader = self.cluster.wait_for_leader_change(None) - num_messages = 1000 - self.add(range(num_messages)) - message_table = self.sql(f"SELECT text FROM message where text='{num_messages -1}'") - self.assertIn("999", message_table) - self.assertEqual(self.count_rows(), num_messages) + self.cluster.wait_for_leader_change(None) + num_messages = 10000 + sub = self.subscribe("select * from counter", n=num_messages) + self.start(1, num_messages) + + message_table = sub()[-1:]; + self.assertIn({'counter': {'deletes': [{'id': 1, 'value': 9999}], 'inserts': [{'id': 1, 'value': 10000}]}}, message_table) - retry(lambda: self.call("clean_up")) # def test_quorum_loss(self): # """This test makes cluster to lose majority of followers to verify if leader eventually stop accepting writes"""