From 3f6ab0077bf187d1fd6168dac5971f392cc565be Mon Sep 17 00:00:00 2001 From: InKryption <59504965+InKryption@users.noreply.github.com> Date: Fri, 14 Feb 2025 19:30:17 +0100 Subject: [PATCH] feat(cli): Mock RPC Server (#564) * Generalize `findAndUnpackTestSnapshots` Factor it out into `findAndUnpackSnapshotFilePair` alongside `unpackSnapshotFilePair`, for more general usage outside of tests. * Add 'mock-rpc-server' subcommand This will facilitate local manual testing of the RPC server with curl and the like --- src/accountsdb/db.zig | 37 ++++++++++++---- src/accountsdb/download.zig | 13 ++++-- src/cmd.zig | 88 +++++++++++++++++++++++++++++++++++++ 3 files changed, 125 insertions(+), 13 deletions(-) diff --git a/src/accountsdb/db.zig b/src/accountsdb/db.zig index 70b4d8b2f..3af751746 100644 --- a/src/accountsdb/db.zig +++ b/src/accountsdb/db.zig @@ -3458,22 +3458,43 @@ pub fn findAndUnpackTestSnapshots( ) !SnapshotFiles { comptime std.debug.assert(builtin.is_test); // should only be used in tests const allocator = std.testing.allocator; - var test_data_dir = try std.fs.cwd().openDir(sig.TEST_DATA_DIR, .{ .iterate = true }); defer test_data_dir.close(); + return try findAndUnpackSnapshotFilePair(allocator, n_threads, output_dir, test_data_dir); +} - const snapshot_files = try SnapshotFiles.find(allocator, test_data_dir); +pub fn findAndUnpackSnapshotFilePair( + allocator: std.mem.Allocator, + n_threads: usize, + dst_dir: std.fs.Dir, + /// Must be iterable. + src_dir: std.fs.Dir, +) !SnapshotFiles { + const snapshot_files = try SnapshotFiles.find(allocator, src_dir); + try unpackSnapshotFilePair(allocator, n_threads, dst_dir, src_dir, snapshot_files); + return snapshot_files; +} +/// Unpacks the identified snapshot files in `src_dir` into `dst_dir`. +pub fn unpackSnapshotFilePair( + allocator: std.mem.Allocator, + n_threads: usize, + dst_dir: std.fs.Dir, + src_dir: std.fs.Dir, + /// `= try SnapshotFiles.find(allocator, src_dir)` + snapshot_files: SnapshotFiles, +) !void { { - const full_name_bounded = snapshot_files.full.snapshotArchiveName(); + const full = snapshot_files.full; + const full_name_bounded = full.snapshotArchiveName(); const full_name = full_name_bounded.constSlice(); - const full_archive_file = try test_data_dir.openFile(full_name, .{}); + const full_archive_file = try src_dir.openFile(full_name, .{}); defer full_archive_file.close(); try parallelUnpackZstdTarBall( allocator, .noop, full_archive_file, - output_dir, + dst_dir, n_threads, true, ); @@ -3482,19 +3503,17 @@ pub fn findAndUnpackTestSnapshots( if (snapshot_files.incremental()) |inc| { const inc_name_bounded = inc.snapshotArchiveName(); const inc_name = inc_name_bounded.constSlice(); - const inc_archive_file = try test_data_dir.openFile(inc_name, .{}); + const inc_archive_file = try src_dir.openFile(inc_name, .{}); defer inc_archive_file.close(); try parallelUnpackZstdTarBall( allocator, .noop, inc_archive_file, - output_dir, + dst_dir, n_threads, false, ); } - - return snapshot_files; } fn loadTestAccountsDB( diff --git a/src/accountsdb/download.zig b/src/accountsdb/download.zig index cef8fdd30..a6957f320 100644 --- a/src/accountsdb/download.zig +++ b/src/accountsdb/download.zig @@ -486,14 +486,16 @@ pub fn getOrDownloadAndUnpackSnapshot( }; } - const snapshot_dir = try std.fs.cwd().makeOpenPath(snapshot_path, .{ + var snapshot_dir = try std.fs.cwd().makeOpenPath(snapshot_path, .{ .iterate = true, }); + defer snapshot_dir.close(); // download a new snapshot if required const snapshot_exists = blk: { - _ = SnapshotFiles.find(allocator, snapshot_dir) catch { - break :blk false; + _ = SnapshotFiles.find(allocator, snapshot_dir) catch |err| switch (err) { + error.NoFullSnapshotFileInfoFound => break :blk false, + else => |e| return e, }; break :blk true; }; @@ -588,7 +590,10 @@ pub fn getOrDownloadAndUnpackSnapshot( timer.reset(); logger.info().logf("unpacking {s}...", .{incremental_snapshot.snapshotArchiveName().constSlice()}); - const archive_file = try snapshot_dir.openFile(incremental_snapshot.snapshotArchiveName().constSlice(), .{}); + const archive_file = try snapshot_dir.openFile( + incremental_snapshot.snapshotArchiveName().constSlice(), + .{}, + ); defer archive_file.close(); try parallelUnpackZstdTarBall( diff --git a/src/cmd.zig b/src/cmd.zig index 15893f0b2..e57c57e48 100644 --- a/src/cmd.zig +++ b/src/cmd.zig @@ -680,6 +680,23 @@ pub fn main() !void { }, }, }, + &cli.Command{ + .name = "mock-rpc-server", + .description = .{ + .one_line = "Run a mock RPC server.", + }, + .options = gossip_options_base ++ + gossip_options_node ++ + accounts_db_options_base ++ + accounts_db_options_download ++ + &[_]*cli.Option{&force_new_snapshot_download_option} ++ + accounts_db_options_index, + .target = .{ + .action = .{ + .exec = mockRpcServer, + }, + }, + }, }, }, }, @@ -1254,6 +1271,77 @@ pub fn testTransactionSenderService() !void { transaction_sender_handle.join(); } +fn mockRpcServer() !void { + const logger: sig.trace.Logger = .{ .direct_print = .{ .max_level = .trace } }; + + var snapshot_dir = try std.fs.cwd().makeOpenPath(current_config.accounts_db.snapshot_dir, .{ + .iterate = true, + }); + defer snapshot_dir.close(); + + const snap_files = try sig.accounts_db.db.findAndUnpackSnapshotFilePair( + gpa_allocator, + std.Thread.getCpuCount() catch 1, + snapshot_dir, + snapshot_dir, + ); + + const SnapshotGenerationInfo = sig.accounts_db.AccountsDB.SnapshotGenerationInfo; + var latest_snapshot_gen_info = sig.sync.RwMux(?SnapshotGenerationInfo).init(blk: { + const all_snap_fields = try FullAndIncrementalManifest.fromFiles( + gpa_allocator, + logger.unscoped(), + snapshot_dir, + snap_files, + ); + defer all_snap_fields.deinit(gpa_allocator); + + break :blk .{ + .full = .{ + .slot = snap_files.full.slot, + .hash = snap_files.full.hash, + .capitalization = all_snap_fields.full.bank_fields.capitalization, + }, + .inc = inc: { + const inc = all_snap_fields.incremental orelse break :inc null; + // if the incremental snapshot field is not null, these shouldn't be either + const inc_info = snap_files.incremental_info.?; + const inc_persist = inc.bank_extra.snapshot_persistence.?; + break :inc .{ + .slot = inc_info.slot, + .hash = inc_info.hash, + .capitalization = inc_persist.incremental_capitalization, + }; + }, + }; + }); + + var server_ctx = try sig.rpc.server.Context.init(.{ + .allocator = gpa_allocator, + .logger = logger, + + .snapshot_dir = snapshot_dir, + .latest_snapshot_gen_info = &latest_snapshot_gen_info, + + .read_buffer_size = sig.rpc.server.MIN_READ_BUFFER_SIZE, + .socket_addr = std.net.Address.initIp4(.{ 0, 0, 0, 0 }, 8899), + .reuse_address = true, + }); + defer server_ctx.joinDeinit(); + + var maybe_liou = try sig.rpc.server.LinuxIoUring.init(&server_ctx); + // TODO: currently `if (a) |*b|` on `a: ?noreturn` causes analysis of + // the unwrap block, even though `if (a) |b|` doesn't; fixed in 0.14 + defer if (maybe_liou != null) maybe_liou.?.deinit(); + + var exit = std.atomic.Value(bool).init(false); + try sig.rpc.server.serve( + &exit, + &server_ctx, + if (maybe_liou != null) .{ .linux_io_uring = &maybe_liou.? } else .basic, + ); +} + /// State that typically needs to be initialized at the start of the app, /// and deinitialized only when the app exits. const AppBase = struct {