Skip to content

Commit

Permalink
perf(rpc): io_uring integration & redesign (#477)
Browse files Browse the repository at this point in the history
* RPC server: io_uring upgrade
Separates the server into two parts: the context, and the work pool;
the context contains everything generally needed to run the server,
the work pool contains a statically polymorphic implementation for
a pool to dispatch the actual work to.
In doing this, we also separate certain things out into a few different
files.
The RPC server context API has been modified slightly to reflect this,
and the work pool directly exposed, for now.

* Don't use file-as-struct

* Run style script, respect line length limit

* Improve accept failure handling & update TODOs
* Handle potentially failing/cancelling of `accept_multishot` by
  re-queueing it, based on the `IORING_CQE_F_MORE` flag.
* Revise/simplify the queueing logic for the `accept_multishot` SQE.
* Resolve the EINTR TODO panics, returning a catch-all error value
  indicating it as a bad but non-critical error.
* Update the `a: ?noreturn` `if (a) |*b|` TODO, adding that it's solved
  in 0.14; it should be resolved after we update to 0.14.
* Unify EAGAIN panic message.

* Add TODO to remove hacky-ish workaround

* Use `self: Type` convention

* A few minor fixups and improvements

* Simplify test, make server socket nonblocking
On MacOS, on basic WorkPool, this means we now need to manually set the
accepted socket's flags to the right things, ie, blocking, as opposed
to the server socket's nonblocking mode.
Means we also have to handle EAGAIN a bit differently in the io_uring
backend, but that's a fine tradeoff.

* server.zig -> server/lib.zig

* Segregate out the basic backend
And re-organize some methods based on that change

* Re-organize server module

* Update LOGGER_SCOPE

* Simplify & improve io_uring backend error handling

* De-scope `accept_flags`

* Simplify `can_use` for linux cross-compilation

* (io_uring) Rework error handling, add timeout
Do not exit for *any* errors that are specific to the
related connection, simply free them and continue to the next CQE.

Specifically in the case of `error.SubmissionQueueFull`, instead of
immediately failing, we instead first try to flush the submission queue
and then try again to submit; if it fails a second time, that means
despite flushing the submission queue, it somehow still failed, so
we panic, since this indicates something is *very* wrong.

This also eliminates the `pending_cqes_buf`, since there is actually
no situation in which `consumeOurCqe` returns an error, and we resume
work afterwards - either we process all the received CQEs, or we hard
exit - this was already essentially the case before, now it's more
obvious.

For the main submit, we now wait for at least 1 connection, but we
also add a timeout SQE to make it terminate if we don't receive a
connection or completion of another task for 1 second; this alleviates
the busy loop that was running before.

* (io_uring) Remove multishot_accept_submitted
Also slightly refactor error sets.
Now instead of checking to see if we need to set a flag to re-queue
the multishot accept, we just pass in the server context on init
and queue it, which now makes sense since the context and workpool
are separate.

* (io_uring) Simplify new entry creation
Also add fix for rebase

* Misc fixups

* Re-organize alias/import

* General restructure
* Move more specific functions to the only files they're used.
* Move the `serve*` functions outside of `Context`, making them
  free functions which just accept the context and work pool.
* Remove `acceptAndServeConnection`; originally this was required to
  be able to nicely structure the unit test, and used to be more
  integrated, however it no longer makes sense as a concept.
* Inline `handleRequest` into the basic backend.
* Make the `acceptHandled` function, moved into the basic backend,
  guarantee the specified `sync` behavior, and inline `have_accept4`.
* Appropriately re-export the relevant parts of the server API.
* Added top level doc comments.

* Re-oorganize loggers & scopes

* Refactor `build_options` imports

* Add `no_network_tests` build option
And disable the rpc server test when it is enabled

* Update circleci with `-Dno-network-tests`
  • Loading branch information
InKryption authored Feb 10, 2025
1 parent f42565a commit 694aa99
Show file tree
Hide file tree
Showing 20 changed files with 1,833 additions and 579 deletions.
5 changes: 3 additions & 2 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ jobs:
key: linux-x86_64-0.13.0-{{ checksum "build.zig.zon" }}-v6
- run:
name: Test
command: workspace/zig/zig build test -Dcpu=x86_64_v3 -Denable-tsan=true --color off --summary all
# Disable network-accessing tests for this job, which behave badly on circleci
command: workspace/zig/zig build test -Dcpu=x86_64_v3 -Denable-tsan=true -Dno-network-tests --color off --summary all

test_kcov_linux:
executor: linux-executor
Expand All @@ -165,7 +166,7 @@ jobs:
key: linux-x86_64-0.13.0-{{ checksum "build.zig.zon" }}-v6
- run:
name: Build
command: workspace/zig/zig build test -Dcpu=x86_64_v3 -Denable-tsan=false -Dno-run --summary all
command: workspace/zig/zig build test -Dcpu=x86_64_v3 -Denable-tsan=false -Dno-run -Dno-network-tests --summary all
- run:
name: Test and Collect
command: |
Expand Down
2 changes: 2 additions & 0 deletions build.zig
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ pub fn build(b: *Build) void {
\\Don't install any of the binaries implied by the specified steps, only run them.
\\Use in conjunction with 'no-run' to avoid running as well.
) orelse false;
const no_network_tests = b.option(bool, "no-network-tests", "Do not run any tests that depend on the network.") orelse false;

// Build options
const build_options = b.addOptions();
build_options.addOption(BlockstoreDB, "blockstore_db", blockstore_db);
build_options.addOption(bool, "no_network_tests", no_network_tests);

// CLI build steps
const install_step = b.getInstallStep();
Expand Down
1 change: 1 addition & 0 deletions src/accountsdb/db.zig
Original file line number Diff line number Diff line change
Expand Up @@ -3350,6 +3350,7 @@ test "testWriteSnapshot" {
);
}

/// Unpacks the snapshots from `sig.TEST_DATA_DIR`.
pub fn findAndUnpackTestSnapshots(
n_threads: usize,
/// The directory into which the snapshots are unpacked.
Expand Down
11 changes: 6 additions & 5 deletions src/accountsdb/snapshots.zig
Original file line number Diff line number Diff line change
Expand Up @@ -2215,7 +2215,7 @@ pub const FullSnapshotFileInfo = struct {
slot: Slot,
hash: Hash,

const SnapshotArchiveNameFmtSpec = sig.utils.fmt.BoundedSpec("snapshot-{[slot]d}-{[hash]s}.tar.zst");
pub const SnapshotArchiveNameFmtSpec = sig.utils.fmt.BoundedSpec("snapshot-{[slot]d}-{[hash]s}.tar.zst");

pub const SnapshotArchiveNameStr = SnapshotArchiveNameFmtSpec.BoundedArrayValue(.{
.slot = std.math.maxInt(Slot),
Expand Down Expand Up @@ -2341,7 +2341,7 @@ pub const IncrementalSnapshotFileInfo = struct {
};
}

const SnapshotArchiveNameFmtSpec = sig.utils.fmt.BoundedSpec("incremental-snapshot-{[base_slot]d}-{[slot]d}-{[hash]s}.tar.zst");
pub const SnapshotArchiveNameFmtSpec = sig.utils.fmt.BoundedSpec("incremental-snapshot-{[base_slot]d}-{[slot]d}-{[hash]s}.tar.zst");

pub const SnapshotArchiveNameStr = SnapshotArchiveNameFmtSpec.BoundedArrayValue(.{
.base_slot = std.math.maxInt(Slot),
Expand Down Expand Up @@ -2486,15 +2486,16 @@ pub const SnapshotFiles = struct {
full: FullSnapshotFileInfo,
incremental_info: ?SlotAndHash,

pub fn incremental(snapshot_files: SnapshotFiles) ?IncrementalSnapshotFileInfo {
const inc_info = snapshot_files.incremental_info orelse return null;
pub fn incremental(self: SnapshotFiles) ?IncrementalSnapshotFileInfo {
const inc_info = self.incremental_info orelse return null;
return .{
.base_slot = snapshot_files.full.slot,
.base_slot = self.full.slot,
.slot = inc_info.slot,
.hash = inc_info.hash,
};
}

/// Asserts that `if (maybe_incremental_info) |inc| inc.base_slot == full_info.slot`.
pub fn fromFileInfos(
full_info: FullSnapshotFileInfo,
maybe_incremental_info: ?IncrementalSnapshotFileInfo,
Expand Down
4 changes: 2 additions & 2 deletions src/ledger/blockstore.zig
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const build_options = @import("build-options");
const sig = @import("../sig.zig");
const ledger = @import("lib.zig");

pub const BlockstoreDB = switch (build_options.blockstore_db) {
pub const BlockstoreDB = switch (sig.build_options.blockstore_db) {
.rocksdb => ledger.database.RocksDB(&ledger.schema.list),
.hashmap => ledger.database.SharedHashMapDB(&ledger.schema.list),
};
Expand Down
3 changes: 1 addition & 2 deletions src/ledger/cleanup_service.zig
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
const build_options = @import("build-options");
const std = @import("std");
const sig = @import("../sig.zig");
const ledger = @import("lib.zig");
Expand Down Expand Up @@ -461,7 +460,7 @@ test "findSlotsToClean" {
}
// When implementation is rocksdb, we need to flush memtable to disk to be able to assert.
// We do that by deiniting the current db, which triggers the flushing.
if (build_options.blockstore_db == .rocksdb) {
if (sig.build_options.blockstore_db == .rocksdb) {
db.deinit();
db = try TestDB.reuseBlockstore(@src());
reader.db = db;
Expand Down
1 change: 0 additions & 1 deletion src/ledger/database/hashmap.zig
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
const std = @import("std");
const sig = @import("../../sig.zig");
const database = @import("lib.zig");
const build_options = @import("build-options");

const Allocator = std.mem.Allocator;
const RwLock = std.Thread.RwLock;
Expand Down
3 changes: 1 addition & 2 deletions src/ledger/database/rocksdb.zig
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ const std = @import("std");
const rocks = @import("rocksdb");
const sig = @import("../../sig.zig");
const database = @import("lib.zig");
const build_options = @import("build-options");

const Allocator = std.mem.Allocator;

Expand Down Expand Up @@ -348,7 +347,7 @@ fn callRocks(logger: ScopedLogger(LOG_SCOPE), comptime func: anytype, args: anyt
}

comptime {
if (build_options.blockstore_db == .rocksdb) {
if (sig.build_options.blockstore_db == .rocksdb) {
_ = &database.interface.testDatabase(RocksDB);
}
}
5 changes: 2 additions & 3 deletions src/ledger/fuzz.zig
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
const std = @import("std");
const sig = @import("../sig.zig");
const build_options = @import("build-options");
const ledger = @import("lib.zig");

const ColumnFamily = sig.ledger.database.ColumnFamily;
Expand All @@ -19,7 +18,7 @@ const cf1 = ColumnFamily{

var executed_actions = std.AutoHashMap(Actions, void).init(allocator);

pub const BlockstoreDB = switch (build_options.blockstore_db) {
pub const BlockstoreDB = switch (sig.build_options.blockstore_db) {
.rocksdb => ledger.database.RocksDB(&.{cf1}),
.hashmap => ledger.database.SharedHashMapDB(&.{cf1}),
};
Expand Down Expand Up @@ -198,7 +197,7 @@ fn dbCount(
try executed_actions.put(Actions.count, {});
// TODO Fix why changes are not reflected in count with rocksdb implementation,
// but it does with hashmap.
if (build_options.blockstore_db == .rocksdb) {
if (sig.build_options.blockstore_db == .rocksdb) {
return;
}

Expand Down
3 changes: 1 addition & 2 deletions src/rpc/lib.zig
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
pub const client = @import("client.zig");
pub const server = @import("server.zig");
pub const server = @import("server/lib.zig");

pub const request = @import("request.zig");
pub const response = @import("response.zig");
pub const types = @import("types.zig");

pub const Client = client.Client;
pub const Server = server.Server;

pub const Request = request.Request;
pub const Response = response.Response;
Loading

0 comments on commit 694aa99

Please sign in to comment.