Skip to content

Commit

Permalink
feat(cli): Mock RPC Server (#564)
Browse files Browse the repository at this point in the history
* Generalize `findAndUnpackTestSnapshots`
Factor it out into `findAndUnpackSnapshotFilePair` alongside
`unpackSnapshotFilePair`, for more general usage outside of tests.

* Add 'mock-rpc-server' subcommand
This will facilitate local manual testing of the RPC server with
curl and the like
  • Loading branch information
InKryption authored Feb 14, 2025
1 parent 024cd52 commit 3f6ab00
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 13 deletions.
37 changes: 28 additions & 9 deletions src/accountsdb/db.zig
Original file line number Diff line number Diff line change
Expand Up @@ -3458,22 +3458,43 @@ pub fn findAndUnpackTestSnapshots(
) !SnapshotFiles {
comptime std.debug.assert(builtin.is_test); // should only be used in tests
const allocator = std.testing.allocator;

var test_data_dir = try std.fs.cwd().openDir(sig.TEST_DATA_DIR, .{ .iterate = true });
defer test_data_dir.close();
return try findAndUnpackSnapshotFilePair(allocator, n_threads, output_dir, test_data_dir);
}

const snapshot_files = try SnapshotFiles.find(allocator, test_data_dir);
pub fn findAndUnpackSnapshotFilePair(
allocator: std.mem.Allocator,
n_threads: usize,
dst_dir: std.fs.Dir,
/// Must be iterable.
src_dir: std.fs.Dir,
) !SnapshotFiles {
const snapshot_files = try SnapshotFiles.find(allocator, src_dir);
try unpackSnapshotFilePair(allocator, n_threads, dst_dir, src_dir, snapshot_files);
return snapshot_files;
}

/// Unpacks the identified snapshot files in `src_dir` into `dst_dir`.
pub fn unpackSnapshotFilePair(
allocator: std.mem.Allocator,
n_threads: usize,
dst_dir: std.fs.Dir,
src_dir: std.fs.Dir,
/// `= try SnapshotFiles.find(allocator, src_dir)`
snapshot_files: SnapshotFiles,
) !void {
{
const full_name_bounded = snapshot_files.full.snapshotArchiveName();
const full = snapshot_files.full;
const full_name_bounded = full.snapshotArchiveName();
const full_name = full_name_bounded.constSlice();
const full_archive_file = try test_data_dir.openFile(full_name, .{});
const full_archive_file = try src_dir.openFile(full_name, .{});
defer full_archive_file.close();
try parallelUnpackZstdTarBall(
allocator,
.noop,
full_archive_file,
output_dir,
dst_dir,
n_threads,
true,
);
Expand All @@ -3482,19 +3503,17 @@ pub fn findAndUnpackTestSnapshots(
if (snapshot_files.incremental()) |inc| {
const inc_name_bounded = inc.snapshotArchiveName();
const inc_name = inc_name_bounded.constSlice();
const inc_archive_file = try test_data_dir.openFile(inc_name, .{});
const inc_archive_file = try src_dir.openFile(inc_name, .{});
defer inc_archive_file.close();
try parallelUnpackZstdTarBall(
allocator,
.noop,
inc_archive_file,
output_dir,
dst_dir,
n_threads,
false,
);
}

return snapshot_files;
}

fn loadTestAccountsDB(
Expand Down
13 changes: 9 additions & 4 deletions src/accountsdb/download.zig
Original file line number Diff line number Diff line change
Expand Up @@ -486,14 +486,16 @@ pub fn getOrDownloadAndUnpackSnapshot(
};
}

const snapshot_dir = try std.fs.cwd().makeOpenPath(snapshot_path, .{
var snapshot_dir = try std.fs.cwd().makeOpenPath(snapshot_path, .{
.iterate = true,
});
defer snapshot_dir.close();

// download a new snapshot if required
const snapshot_exists = blk: {
_ = SnapshotFiles.find(allocator, snapshot_dir) catch {
break :blk false;
_ = SnapshotFiles.find(allocator, snapshot_dir) catch |err| switch (err) {
error.NoFullSnapshotFileInfoFound => break :blk false,
else => |e| return e,
};
break :blk true;
};
Expand Down Expand Up @@ -588,7 +590,10 @@ pub fn getOrDownloadAndUnpackSnapshot(
timer.reset();
logger.info().logf("unpacking {s}...", .{incremental_snapshot.snapshotArchiveName().constSlice()});

const archive_file = try snapshot_dir.openFile(incremental_snapshot.snapshotArchiveName().constSlice(), .{});
const archive_file = try snapshot_dir.openFile(
incremental_snapshot.snapshotArchiveName().constSlice(),
.{},
);
defer archive_file.close();

try parallelUnpackZstdTarBall(
Expand Down
88 changes: 88 additions & 0 deletions src/cmd.zig
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,23 @@ pub fn main() !void {
},
},
},
&cli.Command{
.name = "mock-rpc-server",
.description = .{
.one_line = "Run a mock RPC server.",
},
.options = gossip_options_base ++
gossip_options_node ++
accounts_db_options_base ++
accounts_db_options_download ++
&[_]*cli.Option{&force_new_snapshot_download_option} ++
accounts_db_options_index,
.target = .{
.action = .{
.exec = mockRpcServer,
},
},
},
},
},
},
Expand Down Expand Up @@ -1254,6 +1271,77 @@ pub fn testTransactionSenderService() !void {
transaction_sender_handle.join();
}

fn mockRpcServer() !void {
const logger: sig.trace.Logger = .{ .direct_print = .{ .max_level = .trace } };

var snapshot_dir = try std.fs.cwd().makeOpenPath(current_config.accounts_db.snapshot_dir, .{
.iterate = true,
});
defer snapshot_dir.close();

const snap_files = try sig.accounts_db.db.findAndUnpackSnapshotFilePair(
gpa_allocator,
std.Thread.getCpuCount() catch 1,
snapshot_dir,
snapshot_dir,
);

const SnapshotGenerationInfo = sig.accounts_db.AccountsDB.SnapshotGenerationInfo;
var latest_snapshot_gen_info = sig.sync.RwMux(?SnapshotGenerationInfo).init(blk: {
const all_snap_fields = try FullAndIncrementalManifest.fromFiles(
gpa_allocator,
logger.unscoped(),
snapshot_dir,
snap_files,
);
defer all_snap_fields.deinit(gpa_allocator);

break :blk .{
.full = .{
.slot = snap_files.full.slot,
.hash = snap_files.full.hash,
.capitalization = all_snap_fields.full.bank_fields.capitalization,
},
.inc = inc: {
const inc = all_snap_fields.incremental orelse break :inc null;
// if the incremental snapshot field is not null, these shouldn't be either
const inc_info = snap_files.incremental_info.?;
const inc_persist = inc.bank_extra.snapshot_persistence.?;
break :inc .{
.slot = inc_info.slot,
.hash = inc_info.hash,
.capitalization = inc_persist.incremental_capitalization,
};
},
};
});

var server_ctx = try sig.rpc.server.Context.init(.{
.allocator = gpa_allocator,
.logger = logger,

.snapshot_dir = snapshot_dir,
.latest_snapshot_gen_info = &latest_snapshot_gen_info,

.read_buffer_size = sig.rpc.server.MIN_READ_BUFFER_SIZE,
.socket_addr = std.net.Address.initIp4(.{ 0, 0, 0, 0 }, 8899),
.reuse_address = true,
});
defer server_ctx.joinDeinit();

var maybe_liou = try sig.rpc.server.LinuxIoUring.init(&server_ctx);
// TODO: currently `if (a) |*b|` on `a: ?noreturn` causes analysis of
// the unwrap block, even though `if (a) |b|` doesn't; fixed in 0.14
defer if (maybe_liou != null) maybe_liou.?.deinit();

var exit = std.atomic.Value(bool).init(false);
try sig.rpc.server.serve(
&exit,
&server_ctx,
if (maybe_liou != null) .{ .linux_io_uring = &maybe_liou.? } else .basic,
);
}

/// State that typically needs to be initialized at the start of the app,
/// and deinitialized only when the app exits.
const AppBase = struct {
Expand Down

0 comments on commit 3f6ab00

Please sign in to comment.