diff --git a/src/rpc/server/basic.zig b/src/rpc/server/basic.zig index da7422756..fac38b246 100644 --- a/src/rpc/server/basic.zig +++ b/src/rpc/server/basic.zig @@ -68,6 +68,8 @@ pub fn acceptAndServeConnection(server_ctx: *server.Context) !void { .content_length = archive_len, .respond_options = .{}, }); + // flush the headers, so that if this is a head request, we can mock the response without doing unnecessary work + try response.flush(); if (!response.elide_body) { // use a length which is still a multiple of 2, greater than the send_buffer length, @@ -97,18 +99,31 @@ pub fn acceptAndServeConnection(server_ctx: *server.Context) !void { try response.end(); return; }, - .unrecognized => {}, + .health => { + try request.respond("unknown", .{ + .status = .ok, + .keep_alive = false, + }); + return; + }, + + .genesis_file => {}, + + .not_found => {}, }, .POST => { logger.err().logf("{} tried to invoke our RPC", .{conn_address}); - return try request.respond("RPCs are not yet implemented", .{ + try request.respond("RPCs are not yet implemented", .{ .status = .service_unavailable, .keep_alive = false, }); + return; }, else => {}, } + // fallthrough to 404 Not Found + logger.err().logf( "{} made an unrecognized request '{} {s}'", .{ conn_address, requests.methodFmt(request.head.method), request.head.target }, diff --git a/src/rpc/server/linux_io_uring.zig b/src/rpc/server/linux_io_uring.zig index f91621b5a..c8edf8f36 100644 --- a/src/rpc/server/linux_io_uring.zig +++ b/src/rpc/server/linux_io_uring.zig @@ -10,6 +10,8 @@ const IoUring = std.os.linux.IoUring; const LOGGER_SCOPE = "rpc.server.linux_io_uring"; +const HTTP_VERSION: std.http.Version = .@"HTTP/1.0"; + pub const LinuxIoUring = struct { io_uring: IoUring, @@ -60,7 +62,7 @@ pub const LinuxIoUring = struct { const timeout_sqe = try getSqeRetry(&self.io_uring); timeout_sqe.prep_timeout(&timeout_ts, 1, 0); - timeout_sqe.user_data = 1; + timeout_sqe.user_data = @bitCast(Entry.TIMEOUT); _ = try self.io_uring.submit_and_wait(1); @@ -69,9 +71,6 @@ pub const LinuxIoUring = struct { const cqes_pending = pending_cqes_buf[0..pending_cqes_count]; for (cqes_pending) |raw_cqe| { - // NOTE(ink): this is kind of hacky, should try refactoring this to use DOD-like indexes instead of pointers, - // that way we can allocate special static indexes instead of this. - if (raw_cqe.user_data == timeout_sqe.user_data) continue; const our_cqe = OurCqe.fromCqe(raw_cqe); try consumeOurCqe(self, server_ctx, our_cqe); } @@ -109,44 +108,50 @@ fn consumeOurCqe( const entry = cqe.user_data; errdefer entry.deinit(server_ctx.allocator); - const entry_data: *EntryData = entry.ptr orelse { - // `accept_multishot` cqe - - // we may need to re-submit the `accept_multishot` sqe. - const accept_cancelled = cqe.flags & std.os.linux.IORING_CQE_F_MORE == 0; - if (accept_cancelled) try prepMultishotAccept(&liou.io_uring, server_ctx.tcp); - - switch (try connection.handleAcceptResult(cqe.err())) { - .success => {}, - // just quickly exit; if we need to re-issue, that's already handled above - .intr, - .again, - .conn_aborted, - .proto_fail, - => return, - } + const entry_data: *EntryData = switch (entry.tag) { + _ => entry.ptr, + .timeout => return, + .accept => { + // `accept_multishot` cqe - const stream: std.net.Stream = .{ .handle = cqe.res }; - errdefer stream.close(); + // we may need to re-submit the `accept_multishot` sqe. + const accept_cancelled = cqe.flags & std.os.linux.IORING_CQE_F_MORE == 0; + if (accept_cancelled) { + try prepMultishotAccept(&liou.io_uring, server_ctx.tcp); + } - server_ctx.wait_group.start(); - errdefer server_ctx.wait_group.finish(); + switch (try connection.handleAcceptResult(cqe.err())) { + .success => {}, + // just quickly exit; if we need to re-issue, that's already handled above + .intr, + .again, + .conn_aborted, + .proto_fail, + => return, + } - const buffer = try server_ctx.allocator.alloc(u8, server_ctx.read_buffer_size); - errdefer server_ctx.allocator.free(buffer); + const stream: std.net.Stream = .{ .handle = cqe.res }; + errdefer stream.close(); - const data_ptr = try server_ctx.allocator.create(EntryData); - errdefer server_ctx.allocator.destroy(data_ptr); - data_ptr.* = .{ - .buffer = buffer, - .stream = stream, - .state = EntryState.INIT, - }; + server_ctx.wait_group.start(); + errdefer server_ctx.wait_group.finish(); + + const buffer = try server_ctx.allocator.alloc(u8, server_ctx.read_buffer_size); + errdefer server_ctx.allocator.free(buffer); - const sqe = try getSqeRetry(&liou.io_uring); - sqe.prep_recv(stream.handle, buffer, 0); - sqe.user_data = @bitCast(Entry{ .ptr = data_ptr }); - return; + const data_ptr = try server_ctx.allocator.create(EntryData); + errdefer server_ctx.allocator.destroy(data_ptr); + data_ptr.* = .{ + .buffer = buffer, + .stream = stream, + .state = EntryState.INIT, + }; + + const sqe = try getSqeRetry(&liou.io_uring); + sqe.prep_recv(stream.handle, buffer, 0); + sqe.user_data = @bitCast(Entry{ .ptr = data_ptr }); + return; + }, }; errdefer server_ctx.wait_group.finish(); @@ -361,7 +366,7 @@ fn consumeOurCqe( }, }, - .send_no_body => |*snb| { + .send_static_string => |*sss| { switch (try connection.handleSendResult(cqe.err())) { .success => {}, .again => std.debug.panic(eagain_panic_msg, .{}), @@ -374,12 +379,12 @@ fn consumeOurCqe( }, } const sent_len: usize = @intCast(cqe.res); - snb.end_index += sent_len; + sss.end_index += sent_len; - if (snb.end_index < snb.head.len) { - try snb.prepSend(entry, &liou.io_uring); + if (sss.end_index < sss.data.len) { + try sss.prepSend(entry, &liou.io_uring); return; - } else std.debug.assert(snb.end_index == snb.head.len); + } else std.debug.assert(sss.end_index == sss.data.len); entry.deinit(server_ctx.allocator); server_ctx.wait_group.finish(); @@ -405,7 +410,7 @@ fn handleRecvBody( ) HandleRecvBodyError!void { const logger = server_ctx.logger.withScope(LOGGER_SCOPE); - const entry_data = entry.ptr.?; + const entry_data = entry.ptr; std.debug.assert(body == &entry_data.state.recv_body); if (!body.head_info.method.requestHasBody()) { @@ -420,12 +425,11 @@ fn handleRecvBody( switch (body.head_info.method) { .POST => { entry_data.state = .{ - .send_no_body = EntryState.SendNoBody.initHttStatus( - .@"HTTP/1.0", + .send_static_string = EntryState.SendStaticString.initHttpStatusNoBody( .service_unavailable, ), }; - const snb = &entry_data.state.send_no_body; + const snb = &entry_data.state.send_static_string; try snb.prepSend(entry, &liou.io_uring); return; }, @@ -488,19 +492,31 @@ fn handleRecvBody( } return; }, - .unrecognized => {}, + .health => { + entry_data.state = .{ + .send_static_string = EntryState.SendStaticString.initHttpStatusString( + .ok, + "unknown", + ), + }; + const snb = &entry_data.state.send_static_string; + try snb.prepSend(entry, &liou.io_uring); + return; + }, + + .genesis_file => {}, + .not_found => {}, }, else => {}, } entry_data.state = .{ - .send_no_body = EntryState.SendNoBody.initHttStatus( - .@"HTTP/1.0", + .send_static_string = EntryState.SendStaticString.initHttpStatusNoBody( .not_found, ), }; - const snb = &entry_data.state.send_no_body; + const snb = &entry_data.state.send_static_string; try snb.prepSend(entry, &liou.io_uring); return; } @@ -531,14 +547,26 @@ const OurCqe = extern struct { } }; -const Entry = packed struct(u64) { - /// If null, this is an `accept` entry. - ptr: ?*EntryData, +const Entry = packed union { + /// Only valid if `tag` is an unnamed tag. + ptr: *EntryData, + tag: enum(u64) { + //! Unnamed values correspond to the `ptr` field. - const ACCEPT: Entry = .{ .ptr = null }; + accept = 0, + timeout = 1, + + _, + }, + + const ACCEPT: Entry = .{ .tag = .accept }; + const TIMEOUT: Entry = .{ .tag = .timeout }; fn deinit(self: Entry, allocator: std.mem.Allocator) void { - const ptr = self.ptr orelse return; + const ptr = switch (self.tag) { + .accept, .timeout => return, + _ => self.ptr, + }; ptr.deinit(allocator); allocator.destroy(ptr); } @@ -561,7 +589,7 @@ const EntryState = union(enum) { recv_body: RecvBody, send_file_head: SendFileHead, send_file_body: SendFileBody, - send_no_body: SendNoBody, + send_static_string: SendStaticString, const INIT: EntryState = .{ .recv_head = .{ @@ -576,7 +604,7 @@ const EntryState = union(enum) { .recv_body => {}, .send_file_head => |*sfh| sfh.deinit(), .send_file_body => |*sfb| sfb.deinit(), - .send_no_body => {}, + .send_static_string => {}, } } @@ -589,7 +617,7 @@ const EntryState = union(enum) { entry: Entry, io_uring: *IoUring, ) GetSqeRetryError!void { - const entry_ptr = entry.ptr.?; + const entry_ptr = entry.ptr; std.debug.assert(self == &entry_ptr.state.recv_head); const usable_buffer = entry_ptr.buffer[self.end..]; @@ -652,7 +680,7 @@ const EntryState = union(enum) { /// There is still more head data to send. sending_more, } { - const entry_data = entry.ptr.?; + const entry_data = entry.ptr; std.debug.assert(self == &entry_data.state.send_file_head); const rendered_len = blk: { @@ -665,7 +693,7 @@ const EntryState = union(enum) { const status: std.http.Status = .ok; writer.print("{[version]s} {[status]d}{[space]s}{[phrase]s}\r\n", .{ - .version = @tagName(std.http.Version.@"HTTP/1.0"), + .version = @tagName(HTTP_VERSION), .status = @intFromEnum(status), .space = if (status.phrase() != null) " " else "", .phrase = if (status.phrase()) |str| str else "", @@ -712,7 +740,7 @@ const EntryState = union(enum) { entry: Entry, io_uring: *IoUring, ) GetSqeRetryError!void { - const entry_ptr = entry.ptr.?; + const entry_ptr = entry.ptr; std.debug.assert(self == &entry_ptr.state.send_file_body); std.debug.assert(self.which == .to_pipe); @@ -732,7 +760,7 @@ const EntryState = union(enum) { entry: Entry, io_uring: *IoUring, ) GetSqeRetryError!void { - const entry_ptr = entry.ptr.?; + const entry_ptr = entry.ptr; std.debug.assert(self == &entry_ptr.state.send_file_body); std.debug.assert(self.which == .to_socket); @@ -750,40 +778,56 @@ const EntryState = union(enum) { } }; - const SendNoBody = struct { + const SendStaticString = struct { /// Should be a statically-lived string. - head: []const u8, + data: []const u8, end_index: usize, - fn initString(comptime str: []const u8) SendNoBody { + fn initRawString(comptime str: []const u8) SendStaticString { return .{ - .head = str, + .data = str, .end_index = 0, }; } - fn initHttStatus( - comptime version: std.http.Version, + fn initHttpStatusNoBody( + comptime status: std.http.Status, + ) SendStaticString { + return initRawString(statusLineStr(status) ++ "\r\n"); + } + + fn initHttpStatusString( + comptime status: std.http.Status, + comptime body: []const u8, + ) SendStaticString { + const str = comptime "" ++ + statusLineStr(status) ++ + std.fmt.comptimePrint("Content-Length: {d}\r\n", .{body.len}) ++ + "\r\n" ++ + body; + return initRawString(str); + } + + inline fn statusLineStr( comptime status: std.http.Status, - ) SendNoBody { - const head = comptime std.fmt.comptimePrint("{s} {d}{s}\r\n\r\n", .{ - @tagName(version), + ) []const u8 { + comptime return std.fmt.comptimePrint("{s} {d}{s}\r\n", .{ + @tagName(HTTP_VERSION), @intFromEnum(status), if (status.phrase()) |phrase| " " ++ phrase else "", }); - return initString(head); } fn prepSend( - self: *const SendNoBody, + self: *const SendStaticString, entry: Entry, io_uring: *IoUring, ) GetSqeRetryError!void { - const entry_ptr = entry.ptr.?; - std.debug.assert(self == &entry_ptr.state.send_no_body); + const entry_ptr = entry.ptr; + std.debug.assert(self == &entry_ptr.state.send_static_string); const sqe = try getSqeRetry(io_uring); - sqe.prep_send(entry_ptr.stream.handle, self.head[self.end_index..], 0); + sqe.prep_send(entry_ptr.stream.handle, self.data[self.end_index..], 0); sqe.user_data = @bitCast(entry); } }; diff --git a/src/rpc/server/requests.zig b/src/rpc/server/requests.zig index 993b431a5..51fb79187 100644 --- a/src/rpc/server/requests.zig +++ b/src/rpc/server/requests.zig @@ -17,6 +17,20 @@ pub const MAX_REQUEST_BODY_SIZE: usize = 50 * 1024; // 50 KiB const LOGGER_SCOPE = "rpc.server.requests"; +pub const ContentType = enum(u8) { + @"application/json", +}; + +pub const TargetBoundedStr = std.BoundedArray(u8, MAX_TARGET_LEN); +pub const MAX_TARGET_LEN: usize = blk: { + const SnapSpec = IncrementalSnapshotFileInfo.SnapshotArchiveNameFmtSpec; + break :blk "/".len + SnapSpec.fmtLenValue(.{ + .base_slot = std.math.maxInt(sig.core.Slot), + .slot = std.math.maxInt(sig.core.Slot), + .hash = sig.core.Hash.base58String(.{ .data = .{255} ** sig.core.Hash.SIZE }).constSlice(), + }); +}; + /// All of the relevant information from a request head parsed into a narrow /// format that is comprised of bounded data and can be copied by value. pub const HeadInfo = struct { @@ -69,26 +83,17 @@ pub const HeadInfo = struct { } }; -pub const ContentType = enum(u8) { - @"application/json", -}; - -pub const MAX_TARGET_LEN: usize = blk: { - const SnapSpec = IncrementalSnapshotFileInfo.SnapshotArchiveNameFmtSpec; - break :blk "/".len + SnapSpec.fmtLenValue(.{ - .base_slot = std.math.maxInt(sig.core.Slot), - .slot = std.math.maxInt(sig.core.Slot), - .hash = sig.core.Hash.base58String(.{ .data = .{255} ** sig.core.Hash.SIZE }).constSlice(), - }); -}; -pub const TargetBoundedStr = std.BoundedArray(u8, MAX_TARGET_LEN); - pub const GetRequestTargetResolved = union(enum) { - unrecognized, full_snapshot: struct { FullSnapshotFileInfo, SnapshotReadLock }, inc_snapshot: struct { IncrementalSnapshotFileInfo, SnapshotReadLock }, - // TODO: also handle the snapshot archive aliases & other routes + /// https://github.com/Syndica/sig/issues/558 + health, + + /// https://github.com/Syndica/sig/issues/557 + genesis_file, + + not_found, pub const SnapshotReadLock = sig.sync.RwMux(?SnapshotGenerationInfo).RLockGuard; }; @@ -96,31 +101,29 @@ pub const GetRequestTargetResolved = union(enum) { /// Resolve a `GET` request target. pub fn getRequestTargetResolve( unscoped_logger: sig.trace.Logger, - target: []const u8, + path: []const u8, latest_snapshot_gen_info_rw: *sig.sync.RwMux(?SnapshotGenerationInfo), ) GetRequestTargetResolved { const logger = unscoped_logger.withScope(LOGGER_SCOPE); - if (!std.mem.startsWith(u8, target, "/")) return .unrecognized; - const path = target[1..]; + if (!std.mem.startsWith(u8, path, "/")) return .not_found; + const target = path[1..]; const is_snapshot_archive_like = - !std.meta.isError(FullSnapshotFileInfo.parseFileNameTarZst(path)) or - !std.meta.isError(IncrementalSnapshotFileInfo.parseFileNameTarZst(path)); + !std.meta.isError(FullSnapshotFileInfo.parseFileNameTarZst(target)) or + !std.meta.isError(IncrementalSnapshotFileInfo.parseFileNameTarZst(target)); - if (is_snapshot_archive_like) { - // we hold the lock for the entirety of this process in order to prevent - // the snapshot generation process from deleting the associated snapshot. + if (is_snapshot_archive_like) check_snapshots: { const maybe_latest_snapshot_gen_info, // var latest_snapshot_info_lg // = latest_snapshot_gen_info_rw.readWithLock(); - errdefer latest_snapshot_info_lg.unlock(); + defer latest_snapshot_info_lg.unlock(); - const full_info: ?FullSnapshotFileInfo, // + const full_info: FullSnapshotFileInfo, // const inc_info: ?IncrementalSnapshotFileInfo // = blk: { const latest_snapshot_gen_info = maybe_latest_snapshot_gen_info.* orelse - break :blk .{ null, null }; + break :check_snapshots; const latest_full = latest_snapshot_gen_info.full; const full_info: FullSnapshotFileInfo = .{ .slot = latest_full.slot, @@ -137,30 +140,41 @@ pub fn getRequestTargetResolve( }; logger.debug().logf("Available full: {?s}", .{ - if (full_info) |info| info.snapshotArchiveName().constSlice() else null, + full_info.snapshotArchiveName().constSlice(), }); logger.debug().logf("Available inc: {?s}", .{ if (inc_info) |info| info.snapshotArchiveName().constSlice() else null, }); - if (full_info) |full| { - const full_archive_name_bounded = full.snapshotArchiveName(); - const full_archive_name = full_archive_name_bounded.constSlice(); - if (std.mem.eql(u8, path, full_archive_name)) { - return .{ .full_snapshot = .{ full, latest_snapshot_info_lg } }; - } + const full_archive_name_bounded = full_info.snapshotArchiveName(); + const full_archive_name = full_archive_name_bounded.constSlice(); + if (std.mem.eql(u8, target, full_archive_name)) { + // acquire another lock on the rwmux, since the first one we got is going to unlock after we return. + const latest_snapshot_info_lg_again = latest_snapshot_gen_info_rw.read(); + return .{ + .full_snapshot = .{ + full_info, + latest_snapshot_info_lg_again, + }, + }; } if (inc_info) |inc| { const inc_archive_name_bounded = inc.snapshotArchiveName(); const inc_archive_name = inc_archive_name_bounded.constSlice(); - if (std.mem.eql(u8, path, inc_archive_name)) { - return .{ .inc_snapshot = .{ inc, latest_snapshot_info_lg } }; + if (std.mem.eql(u8, target, inc_archive_name)) { + // acquire another lock on the rwmux, since the first one we got is going to unlock after we return. + const latest_snapshot_info_lg_again = latest_snapshot_gen_info_rw.read(); + return .{ .inc_snapshot = .{ inc, latest_snapshot_info_lg_again } }; } } } - return .unrecognized; + if (std.mem.eql(u8, target, "health")) { + return .health; + } + + return .not_found; } pub fn methodFmt(method: std.http.Method) MethodFmt {