Skip to content

Commit

Permalink
fix(shred-network): keep up with turbine (#493)
Browse files Browse the repository at this point in the history
* fix(shred-network): keep up with turbine

* feat(shred-network): simplify repair logic, remove metric-like logging, try to complete slots more eagerly

* fix(shred-network): tests  + linting

* fix(shred-network): test

* fix(repair) add jitter to handle edge case where all the following are true:

- we're caught at least 1024 slots behind or turbine isn't working
- the current bottom slot is skipped
- the next slot is skipped
- the slot 25 slots ahead is also skipped

with jitter, we only run into a problem if there are also 40 consecutive slots that are skipped, which is practically impossible

* grafan: update shred collector dashboard

* refactor(ledger): switch formatting

Signed-off-by: Drew Nutter <[email protected]>

* refactor(shred-network): function spacing

Signed-off-by: Drew Nutter <[email protected]>

* feat(time): expand Instant to support shred repair

Signed-off-by: Drew Nutter <[email protected]>

* rename rng to prng

* fix timer usage

* refactor(time,repair): use Duration for repair sleep delay

* remove unused

* rename shred collector to shred network

* docs: shred network cli args

---------

Signed-off-by: Drew Nutter <[email protected]>
  • Loading branch information
dnut authored Jan 23, 2025
1 parent 0e8853c commit 0784b21
Show file tree
Hide file tree
Showing 14 changed files with 1,131 additions and 444 deletions.

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion scripts/style.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ def line_length(args, files_to_check):
# are "//"
if line.strip().startswith("//"):
continue
if len(line) > MAX_LINE_LENGTH:
if len(line) > MAX_LINE_LENGTH + 1: # +1 for \n
print(f"{path}:{i + 1} is too long: {len(line)}")
lines_found += 1
if path not in unique_files:
Expand Down
57 changes: 37 additions & 20 deletions src/cmd/cmd.zig
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const ScopedLogger = sig.trace.ScopedLogger;
const Network = config.Network;
const ChannelPrintLogger = sig.trace.ChannelPrintLogger;
const Pubkey = sig.core.Pubkey;
const ShredCollectorDependencies = sig.shred_network.ShredCollectorDependencies;
const ShredNetworkDependencies = sig.shred_network.ShredNetworkDependencies;
const LeaderSchedule = sig.core.leader_schedule.LeaderSchedule;
const SnapshotFiles = sig.accounts_db.SnapshotFiles;
const SocketAddr = sig.net.SocketAddr;
Expand Down Expand Up @@ -149,6 +149,22 @@ pub fn run() !void {
.value_name = "slot number",
};

var retransmit_option = cli.Option{
.long_name = "no-retransmit",
.help = "Shreds will be received and stored but not retransmitted",
.value_ref = cli.mkRef(&config.current.shred_network.no_retransmit),
.required = false,
.value_name = "Disable Shred Retransmission",
};

var dump_shred_tracker = cli.Option{
.long_name = "dump-shred-tracker",
.help = "Create shred-tracker.txt to visually represent the currently tracked slots.",
.value_ref = cli.mkRef(&config.current.shred_network.dump_shred_tracker),
.required = false,
.value_name = "Dump Shred Tracker",
};

var gossip_entrypoints_option = cli.Option{
.long_name = "entrypoint",
.help = "gossip address of the entrypoint validators",
Expand Down Expand Up @@ -472,9 +488,9 @@ pub fn run() !void {
},

&cli.Command{
.name = "shred-collector",
.description = .{ .one_line = "Run the shred collector to collect and store shreds", .detailed =
\\ This command runs the shred collector without running the full validator
.name = "shred-network",
.description = .{ .one_line = "Run the shred network to collect and store shreds", .detailed =
\\ This command runs the shred network without running the full validator
\\ (mainly excluding the accounts-db setup).
\\
\\ NOTE: this means that this command *requires* a leader schedule to be provided
Expand All @@ -493,13 +509,14 @@ pub fn run() !void {
&gossip_entrypoints_option,
&gossip_spy_node_option,
&gossip_dump_option,
// repair
// shred_network
&turbine_recv_port_option,
&repair_port_option,
&test_repair_option,
// turbine
&dump_shred_tracker,
&turbine_num_retransmit_threads,
&turbine_overwrite_stake_for_testing,
&retransmit_option,
// blockstore cleanup service
&max_shreds_option,
// general
Expand All @@ -509,7 +526,7 @@ pub fn run() !void {
},
.target = .{
.action = .{
.exec = shredCollector,
.exec = shredNetwork,
},
},
},
Expand Down Expand Up @@ -845,12 +862,10 @@ fn validator() !void {
.{ &rpc_epoch_ctx_service, &app_base.exit },
);

// shred collector
var shred_col_conf = config.current.shred_network;
shred_col_conf.start_slot = shred_col_conf.start_slot orelse loaded_snapshot.collapsed_manifest.bank_fields.slot;
// shred network
var shred_network_manager = try sig.shred_network.start(
shred_col_conf,
ShredCollectorDependencies{
config.current.shred_network.toConfig(loaded_snapshot.collapsed_manifest.bank_fields.slot),
ShredNetworkDependencies{
.allocator = allocator,
.logger = app_base.logger.unscoped(),
.registry = app_base.metrics_registry,
Expand All @@ -873,7 +888,7 @@ fn validator() !void {
shred_network_manager.join();
}

fn shredCollector() !void {
fn shredNetwork() !void {
const allocator = gpa_allocator;
var app_base = try AppBase.init(allocator);
defer {
Expand All @@ -888,14 +903,16 @@ fn shredCollector() !void {
var rpc_client = sig.rpc.Client.init(allocator, genesis_config.cluster_type, .{});
defer rpc_client.deinit();

var shred_network_conf = config.current.shred_network;
shred_network_conf.start_slot = shred_network_conf.start_slot orelse blk: {
const response = try rpc_client.getSlot(allocator, .{});
break :blk try response.result();
};
const shred_network_conf = config.current.shred_network.toConfig(
config.current.shred_network.start_slot orelse blk: {
const response = try rpc_client.getSlot(allocator, .{});
break :blk try response.result();
},
);
app_base.logger.info().logf("Starting from slot: {?}", .{shred_network_conf.start_slot});

const repair_port: u16 = config.current.shred_network.repair_port;
const turbine_recv_port: u16 = config.current.shred_network.turbine_recv_port;
const repair_port: u16 = shred_network_conf.repair_port;
const turbine_recv_port: u16 = shred_network_conf.turbine_recv_port;

var gossip_service = try startGossip(allocator, &app_base, &.{
.{ .tag = .repair, .port = repair_port },
Expand Down
29 changes: 23 additions & 6 deletions src/cmd/config.zig
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ const std = @import("std");
const sig = @import("../sig.zig");

const ACCOUNT_INDEX_SHARDS = sig.accounts_db.db.ACCOUNT_INDEX_SHARDS;
const ShredCollectorConfig = sig.shred_network.ShredCollectorConfig;
const ShredNetworkConfig = sig.shred_network.ShredNetworkConfig;
const IpAddr = sig.net.IpAddr;
const LogLevel = sig.trace.Level;
const Cluster = sig.core.Cluster;
Expand All @@ -15,7 +15,7 @@ const getAccountPerFileEstimateFromCluster =
pub const Config = struct {
identity: IdentityConfig = .{},
gossip: GossipConfig = .{},
shred_network: ShredCollectorConfig = shred_network_defaults,
shred_network: ShredNetworkCliArgs = .{},
accounts_db: AccountsDBConfig = .{},
geyser: GeyserConfig = .{},
turbine: TurbineConfig = .{},
Expand Down Expand Up @@ -110,10 +110,27 @@ pub const GossipConfig = struct {
}
};

pub const shred_network_defaults = ShredCollectorConfig{
.turbine_recv_port = 8002,
.repair_port = 8003,
.start_slot = null,
/// The command-line arguments that are used to configure the shred network. The
/// CLI args are slightly different from the `shred_network.start` inputs, so it
/// gets its own struct. `ShredNetworkConfig` represents the inputs to the start
/// function.
const ShredNetworkCliArgs = struct {
start_slot: ?sig.core.Slot = null,
repair_port: u16 = 8003,
turbine_recv_port: u16 = 8002,
no_retransmit: bool = true,
dump_shred_tracker: bool = false,

/// Converts from the CLI args into the `shred_network.start` parameters
pub fn toConfig(self: ShredNetworkCliArgs, fallback_slot: sig.core.Slot) ShredNetworkConfig {
return .{
.start_slot = self.start_slot orelse fallback_slot,
.repair_port = self.repair_port,
.turbine_recv_port = self.turbine_recv_port,
.retransmit = !self.no_retransmit,
.dump_shred_tracker = self.dump_shred_tracker,
};
}
};

/// Analogous to [AccountsDbConfig](https://github.com/anza-xyz/agave/blob/4c921ca276bbd5997f809dec1dd3937fb06463cc/accounts-db/src/accounts_db.rs#L597)
Expand Down
2 changes: 1 addition & 1 deletion src/ledger/benchmarks.zig
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub const BenchmarkLedger = struct {
}

var timer = try sig.time.Timer.start();
_ = try inserter.insertShreds(shreds, is_repairs, null, false, null);
_ = try inserter.insertShreds(shreds, is_repairs, null, false, null, null);
return timer.read();
}

Expand Down
18 changes: 9 additions & 9 deletions src/ledger/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ the RocksDB project and makes it usable within Sig via RocksDB's C API and auto-

The core implementation of the ledger can be found in the [`ledger`](./) module.

## ShredCollector, ShredInserter, and Shredder
## ShredNetwork, ShredInserter, and Shredder

### Shreds

Expand All @@ -59,25 +59,25 @@ The Shred Network is responsible for gathering and storing shreds from the netwo
part of the ledger, the ledger plays a crucial role in supporting its operations. As such, the Shred Network
is implemented in its own module, ie: [`shred_network`](../shred_network), separate from the ledger module.

A core part of the Shred Network is the `ShredCollector` which collects shreds received via the Shred Network
A core part of the Shred Network is the `ShredNetwork` which collects shreds received via the Shred Network
and persist in ledger.

Understanding how the ShredCollector interacts with components from the ledger can help sheds light on key elements of the
Understanding how the ShredNetwork interacts with components from the ledger can help sheds light on key elements of the
ledger’s architecture.

The following diagram illustrates the dependencies between the ShredCollector and related components of the ledger:
The following diagram illustrates the dependencies between the ShredNetwork and related components of the ledger:

```mermaid
graph TD
A[ShredCollector] --> B[ShredInserter]
A[ShredNetwork] --> B[ShredInserter]
B --> C[BlockstoreDB]
D[cleanup_service] --> C
D --> E[BlockstoreReader]
```

![ShredCollector Component](./imgs/shred_network_component.png)
![ShredNetwork Component](./imgs/shred_network_component.png)

- The **ShredCollector** utilizes:
- The **ShredNetwork** utilizes:
- The **ShredInserter** to insert shreds received from the network via Gossip.

- The **ShredInserter** relies on:
Expand All @@ -87,10 +87,10 @@ graph TD
- The **BlockstoreDB** for performing cleanup operations, also backed by RocksDB.
- The **BlockstoreReader** for reading data during cleanup.

The ShredCollector can be run standalone without running the full node.
The ShredNetwork can be run standalone without running the full node.

```
zig-out/bin/sig shred-collector --leader-schedule <path_to_leader_schedule> --network <network> --test-repair-for-slot <slot>
zig-out/bin/sig shred-network --leader-schedule <path_to_leader_schedule> --network <network> --test-repair-for-slot <slot>
```

Note: Running standalone requires manually needing to set to slot to repeatedly send repair requests for shreds from, via the `test-repair-for-slot` flag and
Expand Down
35 changes: 30 additions & 5 deletions src/ledger/shred_inserter/shred_inserter.zig
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,9 @@ pub const ShredInserter = struct {
maybe_slot_leaders: ?SlotLeaders,
is_trusted: bool,
retransmit_sender: ?PointerClosure([]const []const u8, void),
shred_tracker: ?*sig.shred_network.shred_tracker.BasicShredTracker,
) !InsertShredsResult {
const timestamp = sig.time.Instant.now();
///////////////////////////
// check inputs for validity and edge cases
//
Expand Down Expand Up @@ -189,6 +191,16 @@ pub const ShredInserter = struct {
const shred_source: ShredSource = if (is_repair) .repaired else .turbine;
switch (shred) {
.data => |data_shred| {
if (shred_tracker) |tracker| {
tracker.registerDataShred(&shred.data, timestamp) catch |err| {
switch (err) {
error.SlotUnderflow, error.SlotOverflow => {
self.metrics.register_shred_error.observe(@errorCast(err));
},
else => return err,
}
};
}
if (self.checkInsertDataShred(
data_shred,
&state,
Expand Down Expand Up @@ -274,6 +286,16 @@ pub const ShredInserter = struct {
try valid_recovered_shreds.append(shred.payload()); // TODO lifetime
continue;
}
if (shred_tracker) |tracker| {
tracker.registerDataShred(&shred.data, timestamp) catch |err| {
switch (err) {
error.SlotUnderflow, error.SlotOverflow => {
self.metrics.register_shred_error.observe(@errorCast(err));
},
else => return err,
}
};
}
if (self.checkInsertDataShred(
shred.data,
&state,
Expand All @@ -285,7 +307,7 @@ pub const ShredInserter = struct {
)) |completed_data_sets| {
defer completed_data_sets.deinit();
try newly_completed_data_sets.appendSlice(completed_data_sets.items);
self.metrics.num_inserted.inc();
self.metrics.num_recovered_inserted.inc();
try valid_recovered_shreds.append(shred.payload()); // TODO lifetime
} else |e| switch (e) {
error.Exists => self.metrics.num_recovered_exists.inc(),
Expand Down Expand Up @@ -1105,6 +1127,8 @@ pub const BlockstoreInsertionMetrics = struct {
num_code_shreds_invalid_erasure_config: *Counter, // usize
num_code_shreds_inserted: *Counter, // usize

register_shred_error: *sig.prometheus.VariantCounter(sig.shred_network.shred_tracker.SlotOutOfBounds),

pub const prefix = "shred_inserter";
};

Expand Down Expand Up @@ -1171,7 +1195,7 @@ const ShredInserterTestState = struct {
for (0..shreds.len) |i| {
is_repairs[i] = false;
}
return self.inserter.insertShreds(shreds, is_repairs, null, false, null);
return self.inserter.insertShreds(shreds, is_repairs, null, false, null, null);
}

fn checkInsertCodeShred(
Expand Down Expand Up @@ -1205,7 +1229,7 @@ pub fn insertShredsForTest(
for (0..shreds.len) |i| {
is_repairs[i] = false;
}
return inserter.insertShreds(shreds, is_repairs, null, false, null);
return inserter.insertShreds(shreds, is_repairs, null, false, null, null);
}

test "insertShreds single shred" {
Expand All @@ -1214,7 +1238,7 @@ test "insertShreds single shred" {
const allocator = std.testing.allocator;
const shred = try Shred.fromPayload(allocator, &ledger.shred.test_data_shred);
defer shred.deinit();
_ = try state.inserter.insertShreds(&.{shred}, &.{false}, null, false, null);
_ = try state.inserter.insertShreds(&.{shred}, &.{false}, null, false, null, null);
const stored_shred = try state.db.getBytes(
schema.data_shred,
.{ shred.commonHeader().slot, shred.commonHeader().index },
Expand All @@ -1238,7 +1262,7 @@ test "insertShreds 100 shreds from mainnet" {
try shreds.append(shred);
}
_ = try state.inserter
.insertShreds(shreds.items, &(.{false} ** shred_bytes.len), null, false, null);
.insertShreds(shreds.items, &(.{false} ** shred_bytes.len), null, false, null, null);
for (shreds.items) |shred| {
const bytes = try state.db.getBytes(
schema.data_shred,
Expand Down Expand Up @@ -1500,6 +1524,7 @@ test "recovery" {
leader_schedule.provider(),
false,
null,
null,
);

for (data_shreds) |data_shred| {
Expand Down
4 changes: 2 additions & 2 deletions src/shred_network/lib.zig
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub const shred_tracker = @import("shred_tracker.zig");
pub const shred_verifier = @import("shred_verifier.zig");
pub const turbine_tree = @import("turbine_tree.zig");

pub const ShredCollectorConfig = service.ShredCollectorConfig;
pub const ShredCollectorDependencies = service.ShredCollectorDependencies;
pub const ShredNetworkConfig = service.ShredNetworkConfig;
pub const ShredNetworkDependencies = service.ShredNetworkDependencies;

pub const start = service.start;
Loading

0 comments on commit 0784b21

Please sign in to comment.