Skip to content

Commit

Permalink
feat(transaction-sender+rpc): implement initial transaction-sender an…
Browse files Browse the repository at this point in the history
…d revive rpc-client (#256)
  • Loading branch information
yewman authored Sep 4, 2024
1 parent 673b49b commit 8ed10e4
Show file tree
Hide file tree
Showing 27 changed files with 2,634 additions and 2,727 deletions.
783 changes: 783 additions & 0 deletions metrics/grafana/dashboards/transaction_sender_metrics.json

Large diffs are not rendered by default.

83 changes: 83 additions & 0 deletions src/cmd/cmd.zig
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,31 @@ pub fn run() !void {
},
},
},
&cli.Command{
.name = "test-transaction-sender",
.description = .{
.one_line = "Test transaction sender service",
.detailed =
\\Simulates a stream of transaction being sent to the transaction sender by
\\running a mock transaction generator thread. For the moment this just sends
\\transfer transactions between to hard coded testnet accounts.
,
},
.options = &.{
// gossip
&network_option,
&gossip_host_option,
&gossip_port_option,
&gossip_entrypoints_option,
&gossip_spy_node_option,
&gossip_dump_option,
},
.target = .{
.action = .{
.exec = testTransactionSenderService,
},
},
},
},
},
},
Expand Down Expand Up @@ -957,6 +982,64 @@ fn getLeaderScheduleFromCli(allocator: Allocator) !?SingleEpochLeaderSchedule {
null;
}

pub fn testTransactionSenderService() !void {
var app_base = try AppBase.init(gpa_allocator);

if (config.current.gossip.network) |net| {
if (!std.mem.eql(u8, net, "testnet")) {
@panic("Can only run transaction sender service on testnet!");
}
}

for (config.current.gossip.entrypoints) |entrypoint| {
if (std.mem.indexOf(u8, entrypoint, "testnet") == null) {
@panic("Can only run transaction sender service on testnet!");
}
}

const gossip_service, var gossip_manager = try startGossip(gpa_allocator, &app_base, &.{});
defer gossip_manager.deinit();

const transaction_channel = sig.sync.Channel(sig.transaction_sender.TransactionInfo).init(gpa_allocator, 100);
defer transaction_channel.deinit();

const transaction_sender_config = sig.transaction_sender.service.Config{
.cluster = .Testnet,
.socket = SocketAddr.init(app_base.my_ip, 0),
};

var mock_transfer_service = try sig.transaction_sender.MockTransferService.init(
gpa_allocator,
transaction_channel,
&app_base.exit,
);

var transaction_sender_service = try sig.transaction_sender.Service.init(
gpa_allocator,
transaction_sender_config,
transaction_channel,
&gossip_service.gossip_table_rw,
&app_base.exit,
app_base.logger,
);

const mock_transfer_generator_handle = try std.Thread.spawn(
.{},
sig.transaction_sender.MockTransferService.run,
.{&mock_transfer_service},
);

const transaction_sender_handle = try std.Thread.spawn(
.{},
sig.transaction_sender.Service.run,
.{&transaction_sender_service},
);

mock_transfer_generator_handle.join();
transaction_sender_handle.join();
gossip_manager.join();
}

/// State that typically needs to be initialized at the start of the app,
/// and deinitialized only when the app exits.
const AppBase = struct {
Expand Down
47 changes: 47 additions & 0 deletions src/core/leader_schedule.zig
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const Epoch = sig.core.Epoch;
const Pubkey = sig.core.Pubkey;
const Slot = sig.core.Slot;
const WeightedRandomSampler = sig.rand.WeightedRandomSampler;
const RpcClient = sig.rpc.Client;

pub const NUM_CONSECUTIVE_LEADER_SLOTS: u64 = 4;

Expand Down Expand Up @@ -56,6 +57,52 @@ pub fn leaderScheduleFromBank(
};
}

pub fn leaderScheduleFromRpc(allocator: Allocator, start_slot: Slot, rpc_client: *RpcClient) !SingleEpochLeaderSchedule {
const rpc_leader_schedule_response = try rpc_client.getLeaderSchedule(allocator, null, .{});
defer rpc_leader_schedule_response.deinit();
const rpc_leader_schedule = try rpc_leader_schedule_response.result();

var num_leaders: u64 = 0;
for (rpc_leader_schedule.values()) |leader_slots| {
num_leaders += leader_slots.len;
}

const Record = struct { slot: Slot, key: Pubkey };

var leaders_index: usize = 0;
var leaders = try allocator.alloc(Record, num_leaders);
defer allocator.free(leaders);

var rpc_leader_iter = rpc_leader_schedule.iterator();
while (rpc_leader_iter.next()) |entry| {
const key = try Pubkey.fromString(entry.key_ptr.*);
for (entry.value_ptr.*) |slot| {
leaders[leaders_index] = .{ .slot = slot, .key = key };
leaders_index += 1;
}
}

std.mem.sortUnstable(Record, leaders, {}, struct {
fn gt(_: void, lhs: Record, rhs: Record) bool {
return switch (std.math.order(lhs.slot, rhs.slot)) {
.gt => false,
else => true,
};
}
}.gt);

var leader_pubkeys = try allocator.alloc(Pubkey, leaders.len);
for (leaders, 0..) |record, i| {
leader_pubkeys[i] = record.key;
}

return .{
.allocator = allocator,
.slot_leaders = leader_pubkeys,
.start_slot = start_slot,
};
}

pub const StakedNode = struct { id: Pubkey, stake: u64 };

pub fn leaderSchedule(
Expand Down
8 changes: 8 additions & 0 deletions src/core/signature.zig
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ pub const Signature = struct {
return base58.encode(self.data);
}

pub fn base58StringAlloc(self: Signature, allocator: std.mem.Allocator) std.mem.Allocator.Error![]const u8 {
return base58.encodeAlloc(self.data, allocator);
}

pub fn format(
self: Signature,
comptime _: []const u8,
Expand All @@ -59,4 +63,8 @@ pub const Signature = struct {
) !void {
return base58.format(self.data, writer);
}

pub fn jsonStringify(self: Signature, writer: anytype) !void {
try writer.print("\"{s}\"", .{self.base58String().slice()});
}
};
Loading

0 comments on commit 8ed10e4

Please sign in to comment.