diff --git a/.cmake-format.py b/.cmake-format.py index 3c73a4d4d8ad..3d68ccdba5c6 100644 --- a/.cmake-format.py +++ b/.cmake-format.py @@ -40,6 +40,17 @@ "ADDITIONAL_ARGS": "*", }, }, + "add_piccolo_test": { + "kwargs": { + "NAME": "*", + "PYTHON_SCRIPT": "*", + "CONSTITUTION": "*", + "CLIENT_BIN": "*", + "VERIFICATION_FILE": "*", + "LABEL": "*", + "ADDITIONAL_ARGS": "*", + }, + }, "add_picobench": { "kwargs": { "SRCS": "*", diff --git a/CMakeLists.txt b/CMakeLists.txt index 8dcdc0ef68ec..cfc31600093b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1438,29 +1438,29 @@ if(BUILD_TESTS) --max-writes-ahead 1000 --repetitions 10000 ) - add_perf_test( + add_piccolo_test( NAME pi_basic PYTHON_SCRIPT ${CMAKE_CURRENT_LIST_DIR}/tests/infra/basicperf.py CLIENT_BIN ./submit - ADDITIONAL_ARGS --package "samples/apps/basic/libbasic" --repetitions - 100000 + ADDITIONAL_ARGS --package "samples/apps/basic/libbasic" --client-def + "1,write,100000,primary" ) - add_perf_test( + add_piccolo_test( NAME pi_basic_js PYTHON_SCRIPT ${CMAKE_CURRENT_LIST_DIR}/tests/infra/basicperf.py CLIENT_BIN ./submit ADDITIONAL_ARGS --js-app-bundle ${CMAKE_SOURCE_DIR}/samples/apps/basic/js - --repetitions 100000 + --client-def "1,write,100000,primary" ) if(WORKER_THREADS) - add_perf_test( + add_piccolo_test( NAME pi_basic_mt PYTHON_SCRIPT ${CMAKE_CURRENT_LIST_DIR}/tests/infra/basicperf.py CLIENT_BIN ./submit - ADDITIONAL_ARGS --package "samples/apps/basic/libbasic" --repetitions - 200000 --num-localhost-clients ${WORKER_THREADS} + ADDITIONAL_ARGS --package "samples/apps/basic/libbasic" --client-def + "${WORKER_THREADS},write,100000,any" ) endif() diff --git a/cmake/common.cmake b/cmake/common.cmake index 148c9560fb00..71500fa6c2c0 100644 --- a/cmake/common.cmake +++ b/cmake/common.cmake @@ -266,6 +266,85 @@ function(add_perf_test) ) endfunction() +# Helper for building end-to-end perf tests using the python infrastucture +function(add_piccolo_test) + + cmake_parse_arguments( + PARSE_ARGV 0 PARSED_ARGS "" + "NAME;PYTHON_SCRIPT;CONSTITUTION;CLIENT_BIN;VERIFICATION_FILE;LABEL" + "ADDITIONAL_ARGS" + ) + + if(NOT PARSED_ARGS_CONSTITUTION) + set(PARSED_ARGS_CONSTITUTION ${CCF_NETWORK_TEST_DEFAULT_CONSTITUTION}) + endif() + + set(TESTS_SUFFIX "") + set(ENCLAVE_TYPE "") + set(ENCLAVE_PLATFORM "${COMPILE_TARGET}") + if("sgx" STREQUAL COMPILE_TARGET) + set(TESTS_SUFFIX "${TESTS_SUFFIX}_sgx") + set(ENCLAVE_TYPE "release") + elseif("virtual" STREQUAL COMPILE_TARGET) + set(TESTS_SUFFIX "${TESTS_SUFFIX}_virtual") + set(ENCLAVE_TYPE "virtual") + endif() + + set(TESTS_SUFFIX "${TESTS_SUFFIX}_cft") + + set(TEST_NAME "${PARSED_ARGS_NAME}${TESTS_SUFFIX}") + + set(LABEL_ARG "${TEST_NAME}^") + + add_test( + NAME "${PARSED_ARGS_NAME}${TESTS_SUFFIX}" + COMMAND + ${PYTHON} ${PARSED_ARGS_PYTHON_SCRIPT} -b . -c ${PARSED_ARGS_CLIENT_BIN} + ${CCF_NETWORK_TEST_ARGS} ${PARSED_ARGS_CONSTITUTION} ${VERIFICATION_ARG} + --label ${LABEL_ARG} --snapshot-tx-interval 10000 + ${PARSED_ARGS_ADDITIONAL_ARGS} -e ${ENCLAVE_TYPE} -t ${ENCLAVE_PLATFORM} + ${NODES} + ) + + # Make python test client framework importable + set_property( + TEST ${TEST_NAME} + APPEND + PROPERTY ENVIRONMENT "PYTHONPATH=${CCF_DIR}/tests:$ENV{PYTHONPATH}" + ) + if(DEFINED DEFAULT_ENCLAVE_TYPE) + set_property( + TEST ${TEST_NAME} + APPEND + PROPERTY ENVIRONMENT "DEFAULT_ENCLAVE_TYPE=${DEFAULT_ENCLAVE_TYPE}" + ) + endif() + if(DEFINED DEFAULT_ENCLAVE_PLATFORM) + set_property( + TEST ${TEST_NAME} + APPEND + PROPERTY ENVIRONMENT + "DEFAULT_ENCLAVE_PLATFORM=${DEFAULT_ENCLAVE_PLATFORM}" + ) + endif() + set_property( + TEST ${TEST_NAME} + APPEND + PROPERTY LABELS perf + ) + set_property( + TEST ${TEST_NAME} + APPEND + PROPERTY LABELS cft + ) + set_property( + TEST ${TEST_NAME} + APPEND + PROPERTY ENVIRONMENT + "TSAN_OPTIONS=suppressions=${CCF_DIR}/tsan_env_suppressions" + ) +endfunction() + # Picobench wrapper function(add_picobench name) cmake_parse_arguments( diff --git a/samples/apps/basic/basic.cpp b/samples/apps/basic/basic.cpp index 1865d157c505..dba3f4c08aac 100644 --- a/samples/apps/basic/basic.cpp +++ b/samples/apps/basic/basic.cpp @@ -86,6 +86,23 @@ namespace basicapp make_read_only_endpoint( "/records/{key}", HTTP_GET, get, {ccf::user_cert_auth_policy}) .install(); + + auto post = [this](ccf::endpoints::EndpointContext& ctx) { + const nlohmann::json body = + nlohmann::json::parse(ctx.rpc_ctx->get_request_body()); + + const auto records = body.get>(); + + auto records_handle = ctx.tx.template rw(PRIVATE_RECORDS); + for (const auto& [key, value] : records) + { + const std::vector value_vec(value.begin(), value.end()); + records_handle->put(key, value_vec); + } + ctx.rpc_ctx->set_response_status(HTTP_STATUS_NO_CONTENT); + }; + make_endpoint("/records", HTTP_POST, post, {ccf::user_cert_auth_policy}) + .install(); } }; } diff --git a/samples/apps/basic/js/app.json b/samples/apps/basic/js/app.json index 6073db8865d0..35bef8fa9554 100644 --- a/samples/apps/basic/js/app.json +++ b/samples/apps/basic/js/app.json @@ -17,6 +17,16 @@ "mode": "readwrite", "openapi": {} } + }, + "/records": { + "post": { + "js_module": "basic.js", + "js_function": "post_records", + "forwarding_required": "always", + "authn_policies": ["user_cert"], + "mode": "readwrite", + "openapi": {} + } } } } diff --git a/samples/apps/basic/js/src/basic.js b/samples/apps/basic/js/src/basic.js index b7ed55fb7a03..65c81bc4ac8e 100644 --- a/samples/apps/basic/js/src/basic.js +++ b/samples/apps/basic/js/src/basic.js @@ -32,3 +32,15 @@ export function get_record(request) { body: val, }; } + +export function post_records(request) { + const records = request.body.json(); + + for (let key in records) { + records_table.set(ccf.strToBuf(key), ccf.strToBuf(records[key])); + } + + return { + statusCode: 204, + }; +} diff --git a/tests/infra/basicperf.py b/tests/infra/basicperf.py index e324279b16d4..2ed50f6de377 100644 --- a/tests/infra/basicperf.py +++ b/tests/infra/basicperf.py @@ -12,28 +12,11 @@ import hashlib from piccolo import generator import polars as pl -from typing import Dict +from typing import Dict, List import random import string -def minimum_number_of_local_nodes(args): - if args.send_tx_to == "backups": - return 2 - - return 1 - - -def filter_nodes(primary, backups, filter_type): - if filter_type == "primary": - return [primary] - elif filter_type == "backups": - assert backups, "--send-tx-to backups but no backup was found" - return backups - else: - return [primary] + backups - - def configure_remote_client(args, client_id, client_host, common_dir): if client_host == "localhost": client_host = infra.net.expand_localhost() @@ -61,27 +44,65 @@ def configure_remote_client(args, client_id, client_host, common_dir): raise -def write_to_random_keys( - repetitions: int, msgs: generator.Messages, additional_headers: Dict[str, str] +def write_to_key_space( + key_space: List[str], + iterations: int, + msgs: generator.Messages, + additional_headers: Dict[str, str], ): """ Write fixed-size messages to a range of keys, this is the usual logging workload CCF has been running in various forms since early on. Each transaction produces a ledger entry and causes replication to backups. """ - batch_size = 100 - LOG.info(f"Workload: {repetitions} writes to a range of {batch_size} keys") - for i in range(repetitions): - key = f"{i % batch_size}" + LOG.info(f"Workload: {iterations} writes to a range of {len(key_space)} keys") + indices = list(range(iterations)) + random.shuffle(indices) + for index in indices: + key = key_space[index % len(key_space)] msgs.append( f"/records/{key}", "PUT", additional_headers=additional_headers, - body=f"{hashlib.md5(str(i).encode()).hexdigest()}", + body=f"{hashlib.md5(key.encode()).hexdigest()}", + content_type="text/plain", + ) + + +def read_from_key_space( + key_space: List[str], + iterations: int, + msgs: generator.Messages, + additional_headers: Dict[str, str], +): + LOG.info(f"Workload: {iterations} reads from a range of {len(key_space)} keys") + indices = list(range(iterations)) + random.shuffle(indices) + for index in indices: + key = key_space[index % len(key_space)] + msgs.append( + f"/records/{key}", + "GET", + additional_headers=additional_headers, content_type="text/plain", ) +def append_to_msgs(definition, key_space, iterations, msgs, additional_headers): + if definition == "write": + return write_to_key_space(key_space, iterations, msgs, additional_headers) + elif definition == "read": + return read_from_key_space(key_space, iterations, msgs, additional_headers) + elif definition.startswith("rwmix:"): + _, ratio = definition.split(":") + assert iterations % 1000 == 0 + return RWMix(1000, float(ratio))( + key_space, iterations, msgs, additional_headers + ) + else: + raise NotImplementedError(f"No generator for {definition}") + + class RWMix: """ Similar to write_to_random_keys, but with the additions of reads back from the keys. @@ -99,6 +120,7 @@ def __init__(self, batch_size: int, write_fraction: float, msg_len=20): def __call__( self, + key_space: List[str], repetitions: int, msgs: generator.Messages, additional_headers: Dict[str, str], @@ -115,9 +137,10 @@ def __call__( ) ) # Randomly shuffle the keys to be written/read - keys = list(range(self.batch_size)) - random.shuffle(keys) - for key in keys: + indices = list(range(self.batch_size)) + random.shuffle(indices) + for index in indices: + key = key_space[index % len(key_space)] # The first batch always writes to all keys, to make sure they are initialised if (batch == 0) or (key in writes): msgs.append( @@ -137,28 +160,24 @@ def __call__( ) -def configure_client_hosts(args, backups): - client_hosts = [] - if args.one_client_per_backup: - assert backups, "--one-client-per-backup was set but no backup was found" - client_hosts = ["localhost"] * len(backups) - else: - if args.client_nodes: - client_hosts.extend(args.client_nodes) - - if args.num_localhost_clients: - client_hosts.extend(["localhost"] * int(args.num_localhost_clients)) - - if not client_hosts: - client_hosts = ["localhost"] - return client_hosts - - -def run(args, append_messages): - hosts = args.nodes - if not hosts: - hosts = ["local://localhost"] * minimum_number_of_local_nodes(args) - +def create_and_fill_key_space(size: int, primary: infra.node.Node) -> List[str]: + LOG.info(f"Creating and filling key space of size {size}") + space = [f"{i}" for i in range(size)] + mapping = {key: f"{hashlib.md5(key.encode()).hexdigest()}" for key in space} + with primary.client("user0") as c: + r = c.post("/records", mapping) + assert r.status_code == http.HTTPStatus.NO_CONTENT, r + # Quick sanity check + for j in [0, -1]: + r = c.get(f"/records/{space[j]}") + assert r.status_code == http.HTTPStatus.OK, r + assert r.body.text() == mapping[space[j]], r + LOG.info("Key space created and filled") + return space + + +def run(args): + hosts = args.nodes or ["local://localhost"] LOG.info("Starting nodes on {}".format(hosts)) with infra.network.network( hosts, args.binary_dir, args.debug_nodes, args.perf_nodes, pdb=args.pdb @@ -173,52 +192,64 @@ def run(args, append_messages): jwt = jwt_issuer.issue_jwt() additional_headers["Authorization"] = f"Bearer {jwt}" - client_hosts = configure_client_hosts(args, backups) - requests_file_paths = [] - - for client_idx in range(len(client_hosts)): - LOG.info(f"Generating {args.repetitions} requests for client_{client_idx}") - msgs = generator.Messages() - append_messages(args.repetitions, msgs, additional_headers) - - path_to_requests_file = os.path.join( - network.common_dir, f"pi_client{client_idx}_requests.parquet" - ) - LOG.info(f"Writing generated requests to {path_to_requests_file}") - msgs.to_parquet_file(path_to_requests_file) - requests_file_paths.append(path_to_requests_file) + key_space = create_and_fill_key_space(args.key_space_size, primary) clients = [] - nodes_to_send_to = filter_nodes(primary, backups, args.send_tx_to) - for client_id, client_host in enumerate(client_hosts): - node = nodes_to_send_to[client_id % len(nodes_to_send_to)] - - remote_client = configure_remote_client( - args, client_id, client_host, network.common_dir - ) - remote_client.setcmd( - [ - args.client, - "--cert", - "user0_cert.pem", - "--key", - "user0_privk.pem", - "--cacert", - os.path.basename(network.cert_path), - f"--server-address={node.get_public_rpc_host()}:{node.get_public_rpc_port()}", - "--max-writes-ahead", - str(args.max_writes_ahead), - "--send-filepath", - "pi_requests.parquet", - "--response-filepath", - "pi_response.parquet", - "--generator-filepath", - os.path.abspath(requests_file_paths[client_id]), - "--pid-file-path", - "cmd.pid", - ] - ) - clients.append(remote_client) + client_idx = 0 + requests_file_paths = [] + for client_def in args.client_def: + count, gen, iterations, target = client_def.split(",") + rr_idx = 0 + for _ in range(int(count)): + LOG.info(f"Generating {iterations} requests for client_{client_idx}") + msgs = generator.Messages() + append_to_msgs( + gen, key_space, int(iterations), msgs, additional_headers + ) + path_to_requests_file = os.path.join( + network.common_dir, f"pi_client{client_idx}_requests.parquet" + ) + LOG.info(f"Writing generated requests to {path_to_requests_file}") + msgs.to_parquet_file(path_to_requests_file) + requests_file_paths.append(path_to_requests_file) + node = None + if target == "primary": + node = primary + elif target == "backup": + node = backups[rr_idx % len(backups)] + rr_idx += 1 + elif target == "any": + node = network.nodes[rr_idx % len(network.nodes)] + rr_idx += 1 + else: + raise NotImplementedError(f"Unknown target {target}") + remote_client = configure_remote_client( + args, client_idx, "localhost", network.common_dir + ) + remote_client.setcmd( + [ + args.client, + "--cert", + "user0_cert.pem", + "--key", + "user0_privk.pem", + "--cacert", + os.path.basename(network.cert_path), + f"--server-address={node.get_public_rpc_host()}:{node.get_public_rpc_port()}", + "--max-writes-ahead", + str(args.max_writes_ahead), + "--send-filepath", + "pi_requests.parquet", + "--response-filepath", + "pi_response.parquet", + "--generator-filepath", + os.path.abspath(path_to_requests_file), + "--pid-file-path", + "cmd.pid", + ] + ) + clients.append(remote_client) + client_idx += 1 if args.network_only: for remote_client in clients: @@ -383,64 +414,35 @@ def cli_args(): help="List of hostnames[,pub_hostnames:ports]. If empty, spawn minimum working number of local nodes (minimum depends on consensus and other args)", action="append", ) - client_args_group = parser.add_mutually_exclusive_group() - client_args_group.add_argument( - "-cn", - "--client-nodes", - help="List of hostnames for spawning client(s). If empty, one client is spawned locally", - action="append", - ) - client_args_group.add_argument( - "--one-client-per-backup", - help="If set, allocates one (local) client per backup", - action="store_true", - ) - parser.add_argument( - "-nlc", - "--num-localhost-clients", - help="The number of localhost clients. \ - This argument is cumulative with the client-nodes and one-client-per-backup and arguments", - ) - parser.add_argument( - "--send-tx-to", - choices=["primary", "backups", "all"], - default="all", - help="Send client requests only to primary, only to backups, or to all nodes", - ) parser.add_argument( "--use-jwt", help="Use JWT with a temporary issuer as authentication method.", action="store_true", ) - parser.add_argument( - "--repetitions", - help="Number of requests to send", - type=int, - default=100, - ) - parser.add_argument( - "--write-tx-times", - help="Unused, swallowed for compatibility with old args", - action="store_true", - ) parser.add_argument( "--client-timeout-s", help="Number of seconds after which unresponsive clients are shut down", default=90, type=float, ) - parser.add_argument( - "--rw-mix", - help="Run a batched, fractional read/write mix instead of pure writes", - type=float, - ) parser.add_argument( "--max-writes-ahead", help="Maximum number of writes to send to the server without waiting for a response", type=int, default=1000, ) - + parser.add_argument( + "--key-space-size", + help="Size of the key space to be pre-populated and which writes and reads will be performed on", + type=int, + default=1000, + ) + parser.add_argument( + "--client-def", + help="Client definitions, e.g. '3,write,1000,primary' starts 3 clients sending 1000 writes to the primary", + action="append", + required=True, + ) return infra.e2e_args.cli_args( parser=parser, accept_unknown=False, ledger_chunk_bytes_override="5MB" ) @@ -448,7 +450,4 @@ def cli_args(): if __name__ == "__main__": args = cli_args() - if args.rw_mix is None: - run(args, write_to_random_keys) - else: - run(args, RWMix(1000, args.rw_mix)) + run(args) diff --git a/tests/infra/e2e_args.py b/tests/infra/e2e_args.py index aa30e6044e96..de38643f4ae1 100644 --- a/tests/infra/e2e_args.py +++ b/tests/infra/e2e_args.py @@ -279,7 +279,7 @@ def cli_args( "--ledger-chunk-bytes", help="Size (bytes) at which a new ledger chunk is created", type=str, - default="20KB" or ledger_chunk_bytes_override, + default=ledger_chunk_bytes_override or "20KB", ) parser.add_argument( "--snapshot-tx-interval",