Skip to content

Commit

Permalink
feat(channel): replace with a faster implementation (#252)
Browse files Browse the repository at this point in the history
* channel: replace channel with a better implementation

* start integrating channel

* add some test cases for the channel

* only look at the final `gossip_table` when the we've exited

* fix even more races in the test suite

* enable tsan in the test CI

* remove all usages of `unordered` from the codebase

* removing PacketBatch in favour of sending singular packets

* choose better time units for benchmarks

* don't use identifiers that start with a underscore

* remove the old channel implementation

* add some documentation and clean up the code

* remove stale code and improve gossip value filtering

* implement an exit order sync method

* cleanup allocation even more

* remove the last usages of close

* clarify `CONTRIBUTING.md` and more `initial_capacity` from the channel initialization

* gossip: revert filter optimization
- Also don't take ownership of the pull request gossip values, they aren't ours to free. This was causing GPA double free detection to trigger

* gossip: add back missing `clearRetainingCapacity`

* grafana: fix overlapping timestamps

* fix racy deinit for gossip service

* add some comments

* remove `.discard` flag and fix up some logic

* remove unused variable

* remove UAF from gossip

So this one was really hard to find. In Zig, storing the pointer (and then using it) to a capture which was by-value is not well defined. LLVM is allowed to create induction stack space for optimizing loops and overwrite the stack space where the pointer was pointing at. Here we are storing a pointer to the capture, probably as an premature optimization, and it was getting overwriten. Storing a pointer to the prune message wouldn't have optimized anyways, since the capture is still a stack-copy.

* remove unneeded free

* fix shred collector exiting early

* cmd: remember to join on errors so they don't keep running

* fix(shred-collector): not processing shreds

there was a bool being used to decide whether to pass packets to the shred verifier, and it was being negated too many times. i fixed the logic.

i think the bug was introduced due to having the logic spread across too many scopes, which made it confusing. i tried to make it clearer by moving the logic into a narrower scope

---------

Co-authored-by: Drew Nutter <[email protected]>
  • Loading branch information
Rexicon226 and dnut authored Oct 1, 2024
1 parent 9ccbaa9 commit 2cd4d3f
Show file tree
Hide file tree
Showing 49 changed files with 1,709 additions and 1,613 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ jobs:
version: 0.13.0

- name: test
run: zig build test
run: zig build test -Denable-tsan=true

kcov_test:
strategy:
Expand Down
4 changes: 4 additions & 0 deletions build.zig
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub fn build(b: *Build) void {
.root_source_file = b.path("src/main.zig"),
.target = target,
.optimize = optimize,
.sanitize_thread = enable_tsan,
});
b.installArtifact(sig_exe);
sig_exe.root_module.addImport("base58-zig", base58_module);
Expand Down Expand Up @@ -116,6 +117,7 @@ pub fn build(b: *Build) void {
.root_source_file = b.path("src/fuzz.zig"),
.target = target,
.optimize = optimize,
.sanitize_thread = enable_tsan,
});
b.installArtifact(fuzz_exe);
fuzz_exe.root_module.addImport("base58-zig", base58_module);
Expand All @@ -134,6 +136,7 @@ pub fn build(b: *Build) void {
.root_source_file = b.path("src/benchmarks.zig"),
.target = target,
.optimize = optimize,
.sanitize_thread = enable_tsan,
});
b.installArtifact(benchmark_exe);
benchmark_exe.root_module.addImport("base58-zig", base58_module);
Expand All @@ -152,6 +155,7 @@ pub fn build(b: *Build) void {
.root_source_file = b.path("src/geyser/reader.zig"),
.target = target,
.optimize = optimize,
.sanitize_thread = enable_tsan,
});
b.installArtifact(geyser_reader_exe);
geyser_reader_exe.root_module.addImport("sig", sig_mod);
Expand Down
12 changes: 12 additions & 0 deletions docs/CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,18 @@ See the [readme](../readme.md#-setup).

## Style Guide

### Identifiers

We follow the [Zig Style](https://ziglang.org/documentation/master/#Style-Guide) guide as is reasonable.

- Usage of leading underscores in identifiers, i.e (`_foo` or `__bar`) is discouraged unless it's in the correct contex.
Valid usages of such an identifier would be for a psuedo-private field, which is unrecommened for users to touch. For
private fields, there should be a docstring attached explaining that it's a private field. Before creating a private field, it should be considered whether it needs to be a field in the first place. There should be few usages of them.

### Structs

We don't use Zig's file-based structs in the codebase. Meaning use `Foo { }` over `foo.zig: @This()` struct declarations.

### Imports

#### Sig Dependencies
Expand Down
64 changes: 39 additions & 25 deletions src/accountsdb/db.zig
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ pub const AccountsDB = struct {
break :blk .{ ptr, ptr.allocator() };
} else {
logger.infof("using ram index", .{});
break :blk .{ null, std.heap.page_allocator };
break :blk .{ null, allocator };
}
};
errdefer if (maybe_disk_allocator_ptr) |ptr| {
Expand Down Expand Up @@ -307,6 +307,8 @@ pub const AccountsDB = struct {
/// easier to use load function
pub fn loadWithDefaults(
self: *Self,
/// needs to be a thread-safe allocator
allocator: std.mem.Allocator,
snapshot_fields_and_paths: *AllSnapshotFields,
n_threads: u32,
validate: bool,
Expand All @@ -317,7 +319,7 @@ pub const AccountsDB = struct {
const load_duration = try self.loadFromSnapshot(
snapshot_fields.accounts_db_fields,
n_threads,
std.heap.page_allocator,
allocator,
accounts_per_file_estimate,
);
self.logger.infof("loaded from snapshot in {s}", .{load_duration});
Expand All @@ -342,6 +344,7 @@ pub const AccountsDB = struct {
/// Account file info map from the snapshot manifest.
snapshot_manifest: AccountsDbFields,
n_threads: u32,
/// needs to be a thread-safe allocator
per_thread_allocator: std.mem.Allocator,
accounts_per_file_estimate: u64,
) !sig.time.Duration {
Expand Down Expand Up @@ -624,8 +627,8 @@ pub const AccountsDB = struct {

file_map.putAssumeCapacityNoClobber(file_id, accounts_file);
self.largest_file_id = FileId.max(self.largest_file_id, file_id);
_ = self.largest_rooted_slot.fetchMax(slot, .monotonic);
self.largest_flushed_slot.store(self.largest_rooted_slot.load(.monotonic), .monotonic);
_ = self.largest_rooted_slot.fetchMax(slot, .release);
self.largest_flushed_slot.store(self.largest_rooted_slot.load(.acquire), .release);

if (print_progress and progress_timer.read().asNanos() > DB_LOG_RATE.asNanos()) {
printTimeEstimate(
Expand Down Expand Up @@ -749,7 +752,7 @@ pub const AccountsDB = struct {
}
}
self.largest_file_id = FileId.max(self.largest_file_id, thread_db.largest_file_id);
_ = self.largest_rooted_slot.fetchMax(thread_db.largest_rooted_slot.load(.unordered), .monotonic);
_ = self.largest_rooted_slot.fetchMax(thread_db.largest_rooted_slot.load(.acquire), .monotonic);
self.largest_flushed_slot.store(self.largest_rooted_slot.load(.monotonic), .monotonic);

// combine underlying memory
Expand Down Expand Up @@ -1199,7 +1202,7 @@ pub const AccountsDB = struct {
var tmp_bank_fields = try BankFields.random(self.allocator, rand.random(), 128);
defer tmp_bank_fields.deinit(self.allocator);

while (!exit.load(.monotonic)) {
while (!exit.load(.acquire)) {
defer {
const elapsed = timer.lap();
if (elapsed < DB_MANAGER_LOOP_MIN.asNanos()) {
Expand Down Expand Up @@ -3075,11 +3078,11 @@ pub fn writeSnapshotTarWithFields(
}

fn testWriteSnapshotFull(
allocator: std.mem.Allocator,
accounts_db: *AccountsDB,
slot: Slot,
maybe_expected_hash: ?Hash,
) !void {
const allocator = std.testing.allocator;
const snapshot_dir = accounts_db.snapshot_dir;

const manifest_path_bounded = sig.utils.fmt.boundedFmt("snapshots/{0}/{0}", .{slot});
Expand Down Expand Up @@ -3135,11 +3138,11 @@ fn testWriteSnapshotFull(
}

fn testWriteSnapshotIncremental(
allocator: std.mem.Allocator,
accounts_db: *AccountsDB,
slot: Slot,
maybe_expected_incremental_hash: ?Hash,
) !void {
const allocator = std.testing.allocator;
const snapshot_dir = accounts_db.snapshot_dir;

const manifest_path_bounded = sig.utils.fmt.boundedFmt("snapshots/{0}/{0}", .{slot});
Expand Down Expand Up @@ -3196,10 +3199,11 @@ fn testWriteSnapshotIncremental(
}

test "testWriteSnapshot" {
const allocator = std.testing.allocator;
var test_data_dir = try std.fs.cwd().openDir(sig.TEST_DATA_DIR, .{ .iterate = true });
defer test_data_dir.close();

const snap_files = try SnapshotFiles.find(std.testing.allocator, test_data_dir);
const snap_files = try SnapshotFiles.find(allocator, test_data_dir);

var tmp_snap_dir_root = std.testing.tmpDir(.{});
defer tmp_snap_dir_root.cleanup();
Expand All @@ -3208,23 +3212,33 @@ test "testWriteSnapshot" {
{
const archive_file = try test_data_dir.openFile(snap_files.full_snapshot.snapshotNameStr().constSlice(), .{});
defer archive_file.close();
try parallelUnpackZstdTarBall(std.testing.allocator, .noop, archive_file, tmp_snap_dir, 4, true);
try parallelUnpackZstdTarBall(allocator, .noop, archive_file, tmp_snap_dir, 4, true);
}

if (snap_files.incremental_snapshot) |inc_snap| {
const archive_file = try test_data_dir.openFile(inc_snap.snapshotNameStr().constSlice(), .{});
defer archive_file.close();
try parallelUnpackZstdTarBall(std.testing.allocator, .noop, archive_file, tmp_snap_dir, 4, false);
try parallelUnpackZstdTarBall(allocator, .noop, archive_file, tmp_snap_dir, 4, false);
}

var accounts_db = try AccountsDB.init(std.testing.allocator, .noop, tmp_snap_dir, .{
var accounts_db = try AccountsDB.init(allocator, .noop, tmp_snap_dir, .{
.number_of_index_bins = ACCOUNT_INDEX_BINS,
.use_disk_index = false,
}, null);
defer accounts_db.deinit();

try testWriteSnapshotFull(&accounts_db, snap_files.full_snapshot.slot, snap_files.full_snapshot.hash);
try testWriteSnapshotIncremental(&accounts_db, snap_files.incremental_snapshot.?.slot, snap_files.incremental_snapshot.?.hash);
try testWriteSnapshotFull(
allocator,
&accounts_db,
snap_files.full_snapshot.slot,
snap_files.full_snapshot.hash,
);
try testWriteSnapshotIncremental(
allocator,
&accounts_db,
snap_files.incremental_snapshot.?.slot,
snap_files.incremental_snapshot.?.hash,
);
}

fn unpackTestSnapshot(allocator: std.mem.Allocator, n_threads: usize) !void {
Expand Down Expand Up @@ -3328,13 +3342,14 @@ test "geyser stream on load" {
try geyser_writer.?.spawnIOLoop();

const reader_handle = try std.Thread.spawn(.{}, sig.geyser.core.streamReader, .{
allocator,
geyser_exit,
geyser_pipe_path,
null,
null,
});
defer {
geyser_exit.store(true, .unordered);
geyser_exit.store(true, .release);
_ = reader_handle.join();
}

Expand Down Expand Up @@ -3368,7 +3383,7 @@ test "geyser stream on load" {
test "write and read an account" {
const allocator = std.testing.allocator;

var accounts_db, var snapshots = try loadTestAccountsDB(std.testing.allocator, false, 1);
var accounts_db, var snapshots = try loadTestAccountsDB(allocator, false, 1);
defer {
accounts_db.deinit();
snapshots.deinit(allocator);
Expand Down Expand Up @@ -3405,7 +3420,7 @@ test "write and read an account" {
test "load and validate from test snapshot using disk index" {
const allocator = std.testing.allocator;

var accounts_db, var snapshots = try loadTestAccountsDB(std.testing.allocator, false, 1);
var accounts_db, var snapshots = try loadTestAccountsDB(allocator, false, 1);
defer {
accounts_db.deinit();
snapshots.deinit(allocator);
Expand All @@ -3422,7 +3437,7 @@ test "load and validate from test snapshot using disk index" {
test "load and validate from test snapshot parallel" {
const allocator = std.testing.allocator;

var accounts_db, var snapshots = try loadTestAccountsDB(std.testing.allocator, false, 2);
var accounts_db, var snapshots = try loadTestAccountsDB(allocator, false, 2);
defer {
accounts_db.deinit();
snapshots.deinit(allocator);
Expand All @@ -3439,7 +3454,7 @@ test "load and validate from test snapshot parallel" {
test "load and validate from test snapshot" {
const allocator = std.testing.allocator;

var accounts_db, var snapshots = try loadTestAccountsDB(std.testing.allocator, false, 1);
var accounts_db, var snapshots = try loadTestAccountsDB(allocator, false, 1);
defer {
accounts_db.deinit();
snapshots.deinit(allocator);
Expand All @@ -3456,7 +3471,7 @@ test "load and validate from test snapshot" {
test "load clock sysvar" {
const allocator = std.testing.allocator;

var accounts_db, var snapshots = try loadTestAccountsDB(std.testing.allocator, false, 1);
var accounts_db, var snapshots = try loadTestAccountsDB(allocator, false, 1);
defer {
accounts_db.deinit();
snapshots.deinit(allocator);
Expand All @@ -3476,7 +3491,7 @@ test "load clock sysvar" {
test "load other sysvars" {
const allocator = std.testing.allocator;

var accounts_db, var snapshots = try loadTestAccountsDB(std.testing.allocator, false, 1);
var accounts_db, var snapshots = try loadTestAccountsDB(allocator, false, 1);
defer {
accounts_db.deinit();
snapshots.deinit(allocator);
Expand Down Expand Up @@ -3526,7 +3541,7 @@ test "flushing slots works" {
try accounts_db.putAccountSlice(&accounts, &pubkeys, slot);

// this writes to disk
var unclean_account_files = ArrayList(FileId).init(std.testing.allocator);
var unclean_account_files = ArrayList(FileId).init(allocator);
defer unclean_account_files.deinit();
try unclean_account_files.append(try accounts_db.flushSlot(slot));

Expand Down Expand Up @@ -4001,7 +4016,7 @@ pub const BenchmarkAccountsDBSnapshotLoad = struct {
};

pub fn loadSnapshot(bench_args: BenchArgs) !u64 {
const allocator = std.heap.page_allocator;
const allocator = std.heap.c_allocator;

// unpack the snapshot
// NOTE: usually this will be an incremental snapshot
Expand Down Expand Up @@ -4213,8 +4228,7 @@ pub const BenchmarkAccountsDB = struct {
const slot_list_len = bench_args.slot_list_len;
const total_n_accounts = n_accounts * slot_list_len;

var gpa = std.heap.GeneralPurposeAllocator(.{}){};
var allocator = gpa.allocator();
var allocator = std.heap.c_allocator;

const disk_path = sig.TEST_DATA_DIR ++ "tmp/";
std.fs.cwd().makeDir(disk_path) catch {};
Expand Down
Loading

0 comments on commit 2cd4d3f

Please sign in to comment.