From 7c394063b4175164ddf19a95e0ccb93726079c11 Mon Sep 17 00:00:00 2001 From: Sebastian <15335529+Sobeston@users.noreply.github.com> Date: Wed, 12 Feb 2025 16:40:43 +0000 Subject: [PATCH] perf(accountsdb): bufferpool-integration (#476) * Bufferpool: add CachedRead.borrowSlice * Accountsdb: add bufferpool field * use ReadHandle in accountsdb * get it building * wip - debugging * make a mess * comment all test failures * add probes * test and fix ReadHandle's bincode * misc fixes * cleanup * fix line lengths * make freq atomic * all tests passing / misc fixes * add atomic linear fifo * use threadlocal io_uring * frames metadata: use atomics for all fields * make style check happy * small cleanup * add flags for benchmark profiling * rename iterator to byteIterator, add chunked Iterator * replace iterator with faster one; simplified code * remove probes * address comments * remove .inner on ReadHandle * misc cleanup * handle full queue, rollback rcs on error * blocking io: rc cleanup on error path * remove allocation from hot path * line length check * seek files to 0 before reading all * use *const ReadHandle * split out owned and unowned buffers * remove useless errors from read + related functions * store and document file_length on accountfiles * move file_map append * init_allocator -> allocator * cloneAndTrack: duplicate cached read handles * add TODO for io_uring cli flag * cleanup some io_uring code * split out FrameManager and properly address thread safety * remove atomicstack * make style check happy * improve threadsafety / add comments on it * bugfix: use preadall * batch some lock usages / add comments * remove .size, factor out Metadata, remove some atomics * rework locking / fix threadsafety issue / temp regress io_uring * make fast path / reduce contention * squashme * concurrency rework * bring back io_uring * simplify io_uring & wait less * remove dead code * reduce read calls * default to blocking io * align frames * removed dead code * style check * fix bad diff * remove noinlines used for debugging * correctly update populated on read * remove incorrect asserts (triggers rarely, not actually a bug) * remove dead code * naming & comments improvements * small naming/style fixes * move constants to top * some renames / style changes * minor style fixes * macos: correct page size * test some errdefers / increase codecov --- build.zig | 11 +- src/accountsdb/accounts_file.zig | 366 +++++-- src/accountsdb/buffer_pool.zig | 1540 ++++++++++++++++++------------ src/accountsdb/cache.zig | 20 +- src/accountsdb/db.zig | 258 +++-- src/accountsdb/fuzz.zig | 5 +- src/accountsdb/snapshots.zig | 8 +- src/core/account.zig | 53 +- src/geyser/main.zig | 2 +- 9 files changed, 1476 insertions(+), 787 deletions(-) diff --git a/build.zig b/build.zig index 9d5415496..429bb17a0 100644 --- a/build.zig +++ b/build.zig @@ -132,9 +132,10 @@ pub fn build(b: *Build) void { .root_source_file = b.path("src/tests.zig"), .target = target, .optimize = optimize, - .sanitize_thread = enable_tsan, .filters = filters orelse &.{}, + .sanitize_thread = enable_tsan, }); + b.installArtifact(unit_tests_exe); test_step.dependOn(&unit_tests_exe.step); install_step.dependOn(&unit_tests_exe.step); @@ -210,7 +211,13 @@ pub fn build(b: *Build) void { benchmark_exe.linkLibC(); benchmark_exe.root_module.addOptions("build-options", build_options); - benchmark_exe.root_module.addImport("xev", xev_mod); + // make sure pyroscope's got enough info to profile + benchmark_exe.build_id = .fast; + benchmark_exe.root_module.omit_frame_pointer = false; + benchmark_exe.root_module.strip = false; + + b.installArtifact(benchmark_exe); + benchmark_exe.root_module.addImport("base58", base58_mod); benchmark_exe.root_module.addImport("zig-network", zig_network_mod); benchmark_exe.root_module.addImport("zstd", zstd_mod); diff --git a/src/accountsdb/accounts_file.zig b/src/accountsdb/accounts_file.zig index 33bf32d9d..ba338c007 100644 --- a/src/accountsdb/accounts_file.zig +++ b/src/accountsdb/accounts_file.zig @@ -8,6 +8,8 @@ const Slot = sig.core.time.Slot; const Epoch = sig.core.time.Epoch; const Pubkey = sig.core.pubkey.Pubkey; const AccountFileInfo = sig.accounts_db.snapshots.AccountFileInfo; +const AccountDataHandle = sig.accounts_db.buffer_pool.AccountDataHandle; +const BufferPool = sig.accounts_db.buffer_pool.BufferPool; const writeIntLittleMem = sig.core.account.writeIntLittleMem; @@ -78,11 +80,11 @@ pub const FileId = enum(Int) { /// /// Analogous to [StoredAccountMeta::AppendVec](https://github.com/anza-xyz/agave/blob/f8067ea7883e04bdfc1a82b0779f7363b71bf548/accounts-db/src/account_storage/meta.rs#L21) pub const AccountInFile = struct { - // pointers to mmap contents - store_info: *StorageInfo, - account_info: *AccountInfo, - hash_ptr: *Hash, - data: []u8, + store_info: StorageInfo, + account_info: AccountInfo, + hash: Hash, + + data: AccountDataHandle, // other info (used when parsing accounts out) offset: usize = 0, @@ -162,25 +164,29 @@ pub const AccountInFile = struct { const Self = @This(); + pub fn deinit(self: AccountInFile, allocator: std.mem.Allocator) void { + self.data.deinit(allocator); + } + pub fn getSizeInFile(self: *const Self) u64 { return std.mem.alignForward( usize, - AccountInFile.STATIC_SIZE + self.data.len, + AccountInFile.STATIC_SIZE + self.data.len(), @sizeOf(u64), ); } pub fn validate(self: *const Self) ValidateError!void { // make sure upper bits are zero - const exec_byte = @as(*u8, @ptrCast(self.executable())); - const valid_exec = exec_byte.* & ~@as(u8, 1) == 0; + const exec_byte = @as(*const u8, @ptrCast(self.executable())).*; + const valid_exec = exec_byte & ~@as(u8, 1) == 0; if (!valid_exec) { return error.InvalidExecutableFlag; } const valid_lamports = self.account_info.lamports != 0 or ( // ie, is default account - self.data.len == 0 and + self.data.len() == 0 and self.owner().isZeroed() and self.executable().* == false and self.rent_epoch().* == 0); @@ -189,23 +195,13 @@ pub const AccountInFile = struct { } } - pub fn toOwnedAccount( + /// requires .data to be owned by the BufferPool + pub fn dupeCachedAccount( self: *const Self, allocator: std.mem.Allocator, ) std.mem.Allocator.Error!Account { - const owned_data = try allocator.dupe(u8, self.data); - return .{ - .data = owned_data, - .executable = self.executable().*, - .lamports = self.lamports().*, - .owner = self.owner().*, - .rent_epoch = self.rent_epoch().*, - }; - } - - pub fn toAccount(self: *const Self) Account { return .{ - .data = self.data, + .data = try self.data.duplicateBufferPoolRead(allocator), .executable = self.executable().*, .lamports = self.lamports().*, .owner = self.owner().*, @@ -213,43 +209,43 @@ pub const AccountInFile = struct { }; } - pub inline fn pubkey(self: *const Self) *Pubkey { + pub inline fn pubkey(self: *const Self) *const Pubkey { return &self.store_info.pubkey; } - pub inline fn lamports(self: *const Self) *u64 { + pub inline fn lamports(self: *const Self) *const u64 { return &self.account_info.lamports; } - pub inline fn owner(self: *const Self) *Pubkey { + pub inline fn owner(self: *const Self) *const Pubkey { return &self.account_info.owner; } - pub inline fn executable(self: *const Self) *bool { + pub inline fn executable(self: *const Self) *const bool { return &self.account_info.executable; } - pub inline fn rent_epoch(self: *const Self) *Epoch { + pub inline fn rent_epoch(self: *const Self) *const Epoch { return &self.account_info.rent_epoch; } - pub inline fn hash(self: *const Self) *Hash { - return self.hash_ptr; + pub inline fn hash(self: *const Self) *const Hash { + return &self.hash; } pub fn writeToBuf(self: *const Self, buf: []u8) usize { - std.debug.assert(buf.len >= STATIC_SIZE + self.data.len); + std.debug.assert(buf.len >= STATIC_SIZE + self.data.len()); var offset: usize = 0; offset += self.store_info.writeToBuf(buf[offset..]); offset += self.account_info.writeToBuf(buf[offset..]); - @memcpy(buf[offset..(offset + 32)], &self.hash().data); + @memcpy(buf[offset..(offset + 32)], &self.hash.data); offset += 32; offset = std.mem.alignForward(usize, offset, @sizeOf(u64)); - @memcpy(buf[offset..(offset + self.data.len)], self.data); - offset += self.data.len; + self.data.readAll(buf[offset..][0..self.data.len()]); + offset += self.data.len(); offset = std.mem.alignForward(usize, offset, @sizeOf(u64)); return offset; @@ -259,12 +255,17 @@ pub const AccountInFile = struct { /// Analogous to [AccountStorageEntry](https://github.com/anza-xyz/agave/blob/4c921ca276bbd5997f809dec1dd3937fb06463cc/accounts-db/src/accounts_db.rs#L1069) pub const AccountFile = struct { // file contents - memory: []align(std.mem.page_size) u8, + file: std.fs.File, + id: FileId, slot: Slot, - // number of bytes used + + /// The number of usefully readable bytes in the file length: usize, + /// The size of the file can be >.length. Bytes beyond .length are essentially junk. + file_size: usize, + // number of accounts stored in the file number_of_accounts: usize = 0, @@ -276,34 +277,39 @@ pub const AccountFile = struct { try accounts_file_info.validate(file_size); - const memory = try std.posix.mmap( - null, - file_size, - std.posix.PROT.READ, - std.posix.MAP{ .TYPE = .PRIVATE }, - file.handle, - 0, - ); - return .{ - .memory = memory, + .file = file, .length = accounts_file_info.length, + .file_size = file_size, .id = accounts_file_info.id, .slot = slot, }; } pub fn deinit(self: Self) void { - std.posix.munmap(self.memory); + self.file.close(); } - pub fn validate(self: *const Self) !usize { + pub fn validate( + self: *const Self, + metadata_allocator: std.mem.Allocator, + buffer_pool: *BufferPool, + ) !usize { var offset: usize = 0; var number_of_accounts: usize = 0; var account_bytes: usize = 0; while (true) { - const account = self.readAccount(offset) catch break; + const account = self.readAccount( + metadata_allocator, + buffer_pool, + offset, + ) catch |err| switch (err) { + error.EOF => break, + else => return err, + }; + defer account.deinit(metadata_allocator); + try account.validate(); offset = offset + account.len; number_of_accounts += 1; @@ -321,19 +327,21 @@ pub const AccountFile = struct { /// (used when computing account hashes for snapshot validation) pub fn getAccountHashAndLamports( self: *const Self, + metadata_allocator: std.mem.Allocator, + buffer_pool: *BufferPool, start_offset: usize, - ) error{EOF}!struct { hash: *Hash, lamports: *u64 } { + ) !struct { hash: Hash, lamports: u64 } { var offset = start_offset; offset += @sizeOf(AccountInFile.StorageInfo); offset = std.mem.alignForward(usize, offset, @sizeOf(u64)); - const lamports = try self.getType(&offset, u64); + const lamports = try self.getType(metadata_allocator, buffer_pool, &offset, u64); offset += @sizeOf(AccountInFile.AccountInfo) - @sizeOf(u64); offset = std.mem.alignForward(usize, offset, @sizeOf(u64)); - const hash = try self.getType(&offset, Hash); + const hash = try self.getType(metadata_allocator, buffer_pool, &offset, Hash); return .{ .hash = hash, @@ -365,27 +373,183 @@ pub const AccountFile = struct { }; } - pub fn readAccount(self: *const Self, start_offset: usize) error{EOF}!AccountInFile { + const max_header_buf_len = max_size: { + var max_size = 0; + for (0..@sizeOf(u64)) |i| { + var start = i; + + start += @sizeOf(AccountInFile.StorageInfo); + start = std.mem.alignForward(usize, start, @sizeOf(u64)); + start += @sizeOf(AccountInFile.AccountInfo); + start = std.mem.alignForward(usize, start, @sizeOf(u64)); + + start += @sizeOf(Hash); + start = std.mem.alignForward(usize, start, @sizeOf(u64)); + + max_size = @max(max_size, start); + } + break :max_size max_size; + }; + + pub fn readAccount( + self: *const Self, + metadata_allocator: std.mem.Allocator, + buffer_pool: *BufferPool, + start_offset: usize, + ) !AccountInFile { var offset = start_offset; - const store_info = try self.getType(&offset, AccountInFile.StorageInfo); - const account_info = try self.getType(&offset, AccountInFile.AccountInfo); - const hash = try self.getType(&offset, Hash); - const data = try self.getSlice(&offset, store_info.data_len); + offset += @sizeOf(AccountInFile.StorageInfo); + offset = std.mem.alignForward(usize, offset, @sizeOf(u64)); + + const account_info_start = offset; + offset += @sizeOf(AccountInFile.AccountInfo); + offset = std.mem.alignForward(usize, offset, @sizeOf(u64)); + + const hash_start = offset; + offset += @sizeOf(Hash); + offset = std.mem.alignForward(usize, offset, @sizeOf(u64)); + + const header_byte_len = offset - start_offset; + std.debug.assert(header_byte_len <= max_header_buf_len); + + var offset_restarted = start_offset; + const read = try self.getSlice( + metadata_allocator, + buffer_pool, + &offset_restarted, + header_byte_len, + ); + std.debug.assert(offset == offset_restarted); + defer read.deinit(metadata_allocator); + + var buf: [max_header_buf_len]u8 = undefined; + read.readAll(buf[0..header_byte_len]); + + var store_info: AccountInFile.StorageInfo = undefined; + @memcpy( + std.mem.asBytes(&store_info), + buf[0..][0..@sizeOf(AccountInFile.StorageInfo)], + ); + + var account_info: AccountInFile.AccountInfo = undefined; + @memcpy( + std.mem.asBytes(&account_info), + buf[account_info_start - start_offset ..][0..@sizeOf(AccountInFile.AccountInfo)], + ); + + var hash: Hash = undefined; + @memcpy( + std.mem.asBytes(&hash), + buf[hash_start - start_offset ..][0..@sizeOf(Hash)], + ); + + const data = try self.getSlice( + metadata_allocator, + buffer_pool, + &offset_restarted, + store_info.data_len, + ); + errdefer data.deinit(metadata_allocator); + + const len = offset_restarted - start_offset; + + return AccountInFile{ + .store_info = store_info, + .account_info = account_info, + .hash = hash, + .data = data, + .len = len, + .offset = start_offset, + }; + } + + pub fn readAccountNoData( + self: *const Self, + metadata_allocator: std.mem.Allocator, + buffer_pool: *BufferPool, + start_offset: usize, + ) !AccountInFile { + var offset = start_offset; + + offset += @sizeOf(AccountInFile.StorageInfo); + offset = std.mem.alignForward(usize, offset, @sizeOf(u64)); + + const account_info_start = offset; + offset += @sizeOf(AccountInFile.AccountInfo); + offset = std.mem.alignForward(usize, offset, @sizeOf(u64)); + + const hash_start = offset; + offset += @sizeOf(Hash); + offset = std.mem.alignForward(usize, offset, @sizeOf(u64)); + + const header_byte_len = offset - start_offset; + std.debug.assert(header_byte_len <= max_header_buf_len); + + var offset_restarted = start_offset; + const read = try self.getSlice( + metadata_allocator, + buffer_pool, + &offset_restarted, + header_byte_len, + ); + std.debug.assert(offset == offset_restarted); + defer read.deinit(metadata_allocator); + + var buf: [max_header_buf_len]u8 = undefined; + read.readAll(buf[0..header_byte_len]); + + var store_info: AccountInFile.StorageInfo = undefined; + @memcpy( + std.mem.asBytes(&store_info), + buf[0..][0..@sizeOf(AccountInFile.StorageInfo)], + ); + + var account_info: AccountInFile.AccountInfo = undefined; + @memcpy( + std.mem.asBytes(&account_info), + buf[account_info_start - start_offset ..][0..@sizeOf(AccountInFile.AccountInfo)], + ); + + var hash: Hash = undefined; + @memcpy( + std.mem.asBytes(&hash), + buf[hash_start - start_offset ..][0..@sizeOf(Hash)], + ); + + const start_index = offset; + const result = @addWithOverflow(start_index, store_info.data_len); + const end_index = result[0]; + const overflow_flag = result[1]; + if (overflow_flag == 1 or end_index > self.length) { + return error.EOF; + } + + const data = AccountDataHandle.initEmpty( + std.math.cast(u32, store_info.data_len) orelse return error.EOF, + ); + + offset = std.mem.alignForward(usize, end_index, @sizeOf(u64)); const len = offset - start_offset; return AccountInFile{ .store_info = store_info, .account_info = account_info, - .hash_ptr = hash, + .hash = hash, .data = data, .len = len, .offset = start_offset, }; } - pub fn getSlice(self: *const Self, start_index_ptr: *usize, length: usize) error{EOF}![]u8 { + pub fn getSlice( + self: *const Self, + metadata_allocator: std.mem.Allocator, + buffer_pool: *BufferPool, + start_index_ptr: *usize, + length: usize, + ) !AccountDataHandle { const start_index = start_index_ptr.*; const result = @addWithOverflow(start_index, length); const end_index = result[0]; @@ -394,22 +558,66 @@ pub const AccountFile = struct { if (overflow_flag == 1 or end_index > self.length) { return error.EOF; } + start_index_ptr.* = std.mem.alignForward(usize, end_index, @sizeOf(u64)); - return @ptrCast(self.memory[start_index..end_index]); + return try buffer_pool.read( + metadata_allocator, + self.file, + self.id, + @intCast(start_index), + @intCast(end_index), + ); } - pub fn getType(self: *const Self, start_index_ptr: *usize, comptime T: type) error{EOF}!*T { + pub fn getType( + self: *const Self, + metadata_allocator: std.mem.Allocator, + buffer_pool: *BufferPool, + start_index_ptr: *usize, + comptime T: type, + ) !T { const length = @sizeOf(T); - return @alignCast(@ptrCast(try self.getSlice(start_index_ptr, length))); + + const read = try self.getSlice(metadata_allocator, buffer_pool, start_index_ptr, length); + defer read.deinit(metadata_allocator); + + var buf: T = undefined; + read.readAll(std.mem.asBytes(&buf)); + return buf; } pub const Iterator = struct { accounts_file: *const AccountFile, + metadata_allocator: std.mem.Allocator, + buffer_pool: *BufferPool, offset: usize = 0, - pub fn next(self: *Iterator) ?AccountInFile { + pub fn next(self: *Iterator) !?AccountInFile { while (true) { - const account = self.accounts_file.readAccount(self.offset) catch break; + const account = self.accounts_file.readAccount( + self.metadata_allocator, + self.buffer_pool, + self.offset, + ) catch |err| switch (err) { + error.EOF => break, + else => return err, + }; + self.offset = self.offset + account.len; + return account; + } + return null; + } + + pub fn nextNoData(self: *Iterator) !?AccountInFile { + while (true) { + const account = self.accounts_file.readAccountNoData( + self.metadata_allocator, + self.buffer_pool, + self.offset, + ) catch |err| switch (err) { + error.EOF => break, + else => return err, + }; self.offset = self.offset + account.len; return account; } @@ -421,8 +629,16 @@ pub const AccountFile = struct { } }; - pub fn iterator(self: *const Self) Iterator { - return .{ .accounts_file = self }; + pub fn iterator( + self: *const Self, + metadata_allocator: std.mem.Allocator, + buffer_pool: *BufferPool, + ) Iterator { + return .{ + .accounts_file = self, + .metadata_allocator = metadata_allocator, + .buffer_pool = buffer_pool, + }; } }; @@ -433,14 +649,24 @@ test "core.accounts_file: verify accounts file" { .id = FileId.fromInt(0), .length = 162224, }; + + var bp = try BufferPool.init(std.testing.allocator, 1000); + defer bp.deinit(std.testing.allocator); + var accounts_file = try AccountFile.init(file, file_info, 10); defer accounts_file.deinit(); - _ = try accounts_file.validate(); + _ = try accounts_file.validate(std.testing.allocator, &bp); + + const account = try accounts_file.readAccount(std.testing.allocator, &bp, 0); + defer account.deinit(std.testing.allocator); - const account = try accounts_file.readAccount(0); - const hash_and_lamports = try accounts_file.getAccountHashAndLamports(0); + const hash_and_lamports = try accounts_file.getAccountHashAndLamports( + std.testing.allocator, + &bp, + 0, + ); - try std.testing.expectEqual(account.lamports().*, hash_and_lamports.lamports.*); - try std.testing.expectEqual(account.hash().*, hash_and_lamports.hash.*); + try std.testing.expectEqual(account.lamports().*, hash_and_lamports.lamports); + try std.testing.expectEqual(account.hash, hash_and_lamports.hash); } diff --git a/src/accountsdb/buffer_pool.zig b/src/accountsdb/buffer_pool.zig index c08732785..d12887c1d 100644 --- a/src/accountsdb/buffer_pool.zig +++ b/src/accountsdb/buffer_pool.zig @@ -2,14 +2,23 @@ const std = @import("std"); const sig = @import("../sig.zig"); const builtin = @import("builtin"); +const IoUring = std.os.linux.IoUring; +const Atomic = std.atomic.Value; + const FileId = sig.accounts_db.accounts_file.FileId; +const bincode = sig.bincode; /// arbitrarily chosen, I believe >95% of accounts will be <= 512 bytes -const FRAME_SIZE = 512; +pub const FRAME_SIZE = 512; +pub const Frame = [FRAME_SIZE]u8; + const INVALID_FRAME = std.math.maxInt(FrameIndex); -const Frame = [FRAME_SIZE]u8; -/// we can get away with a 32-bit index -const FrameIndex = u32; +const LINUX_IO_MODE: LinuxIoMode = .Blocking; +// TODO: ideally we should be able to select this with a cli flag. (#509) +const USE_IO_URING = builtin.os.tag == .linux and LINUX_IO_MODE == .IoUring; +const IO_URING_ENTRIES = 128; + +const FrameIndex = u31; const FileOffset = u32; const FrameOffset = u10; // 0..=FRAME_SIZE @@ -19,26 +28,68 @@ comptime { _ = offset; } +const FrameRef = packed struct(u32) { + index: FrameIndex, + found_in_cache: bool, + + const INIT = FrameRef{ + .index = INVALID_FRAME, + .found_in_cache = false, + }; +}; + const LinuxIoMode = enum { Blocking, IoUring, }; -const linux_io_mode: LinuxIoMode = .IoUring; -const use_io_uring = builtin.os.tag == .linux and linux_io_mode == .IoUring; +fn ioUring() !*IoUring { + // We use one io_uring instance per-thread internally for fast thread-safe usage. + + // From https://github.com/axboe/liburing/wiki/io_uring-and-networking-in-2023: + // > Not sharing a ring between threads is the recommended way to use rings in general, as it + // > avoids any unnecessary synchronization. Available since 6.1. + + const threadlocals = struct { + threadlocal var io_uring: ?IoUring = null; + }; + + _ = threadlocals.io_uring orelse { + threadlocals.io_uring = try IoUring.init( + IO_URING_ENTRIES, + // Causes an error if we try to init with more entries than the kernel supports - it + // would be bad if the kernel gave us fewer than we expect to have. + std.os.linux.IORING_SETUP_CLAMP, + ); + }; + + return &(threadlocals.io_uring.?); +} const FileIdFileOffset = packed struct(u64) { + file_id: FileId, + + /// offset in the file from which the frame begin + /// always a multiple of FRAME_SIZE + file_offset: FileOffset, + const INVALID: FileIdFileOffset = .{ .file_id = FileId.fromInt(std.math.maxInt(FileId.Int)), // disambiguate from 0xAAAA / will trigger asserts as it's not even. .file_offset = 0xBAAD, }; +}; - file_id: FileId, +const IoUringError = err: { + var Error = error{ NotOpenForReading, InputOutput, IsDir }; + const fns = &.{ IoUring.read, IoUring.submit_and_wait, IoUring.copy_cqes, IoUring.init }; + for (fns) |func| { + Error = Error || @typeInfo( + @typeInfo(@TypeOf(func)).Fn.return_type.?, + ).ErrorUnion.error_set; + } - /// offset in the file from which the frame begin - /// always a multiple of FRAME_SIZE - file_offset: FileOffset, + break :err Error; }; /// Used for obtaining cached reads. @@ -53,208 +104,84 @@ const FileIdFileOffset = packed struct(u64) { /// A frame dies when its index is evicted from HierarchicalFifo (inside of /// evictUnusedFrame). pub const BufferPool = struct { - pub const FrameMap = std.AutoHashMapUnmanaged(FileIdFileOffset, FrameIndex); - - /// indices of all free frames - /// free frames have a refcount of 0 *and* have been evicted - free_list: AtomicStack(FrameIndex), - - /// uniquely identifies a frame - /// for finding your wanted index - /// TODO: a concurrent hashmap would be more appropriate - frame_map_rw: sig.sync.RwMux(FrameMap), - frames: []Frame, - frames_metadata: FramesMetadata, - - /// used for eviction to free less popular (rc=0) frames first - eviction_lfu: HierarchicalFIFO, + frame_manager: FrameManager, - /// NOTE: we might want this to be a threadlocal for best performance? I don't think this field is threadsafe - io_uring: if (use_io_uring) std.os.linux.IoUring else void, + pub const ReadBlockingError = FrameManager.GetError || std.posix.PReadError; + pub const ReadIoUringError = FrameManager.GetError || IoUringError; + pub const ReadError = if (USE_IO_URING) ReadIoUringError else ReadBlockingError; pub fn init( - init_allocator: std.mem.Allocator, + allocator: std.mem.Allocator, num_frames: u32, ) !BufferPool { if (num_frames == 0 or num_frames == 1) return error.InvalidArgument; - const frames = try init_allocator.alignedAlloc(Frame, std.mem.page_size, num_frames); - errdefer init_allocator.free(frames); - - var frames_metadata = try FramesMetadata.init(init_allocator, num_frames); - errdefer frames_metadata.deinit(init_allocator); - - var free_list = try AtomicStack(FrameIndex).init(init_allocator, num_frames); - errdefer free_list.deinit(init_allocator); - for (0..num_frames) |i| free_list.appendAssumeCapacity(@intCast(i)); + // Alignment of frames is good for read performance (and necessary if we want to use O_DIRECT.) + const frames = try allocator.alignedAlloc(Frame, std.mem.page_size, num_frames); + errdefer allocator.free(frames); - var io_uring = if (use_io_uring) blk: { - // NOTE: this is pretty much a guess, maybe worth tweaking? - // think this is a bit on the high end, libxev uses 256 - const io_uring_entries = 4096; - - break :blk try std.os.linux.IoUring.init( - io_uring_entries, - 0, - ); - } else {}; - errdefer if (use_io_uring) io_uring.deinit(); - - var frame_map: FrameMap = .{}; - try frame_map.ensureTotalCapacity(init_allocator, num_frames); - errdefer frame_map.deinit(init_allocator); - - const frame_map_rw = sig.sync.RwMux(FrameMap).init(frame_map); + var frame_manager = try FrameManager.init(allocator, num_frames); + errdefer frame_manager.deinit(allocator); return .{ .frames = frames, - .frames_metadata = frames_metadata, - .free_list = free_list, - .frame_map_rw = frame_map_rw, - .eviction_lfu = try HierarchicalFIFO.init(init_allocator, num_frames / 10, num_frames), - .io_uring = io_uring, + .frame_manager = frame_manager, }; } - pub fn deinit(self: *BufferPool, init_allocator: std.mem.Allocator) void { - init_allocator.free(self.frames); - self.frames_metadata.deinit(init_allocator); - if (use_io_uring) self.io_uring.deinit(); - self.free_list.deinit(init_allocator); - self.eviction_lfu.deinit(init_allocator); - const frame_map, var frame_map_lg = self.frame_map_rw.writeWithLock(); - frame_map.deinit(init_allocator); - frame_map_lg.unlock(); - } - - pub fn computeNumberofFrameIndices( - /// inclusive - file_offset_start: FileOffset, - /// exclusive - file_offset_end: FileOffset, - ) error{InvalidArgument}!u32 { - if (file_offset_start > file_offset_end) return error.InvalidArgument; - if (file_offset_start == file_offset_end) return 0; - - const starting_frame = file_offset_start / FRAME_SIZE; - const ending_frame = (file_offset_end - 1) / FRAME_SIZE; - - return ending_frame - starting_frame + 1; + pub fn deinit( + self: *BufferPool, + allocator: std.mem.Allocator, + ) void { + allocator.free(self.frames); + self.frame_manager.deinit(allocator); } - /// allocates the required amount of indices, sets them all to - /// INVALID_FRAME, overwriting with a valid frame where one is found. - /// INVALID_FRAME indicates that there is no frame in the BufferPool for the - /// given file_id and range. - fn computeFrameIndices( + fn readBlocking( self: *BufferPool, - file_id: FileId, + /// used for temp allocations, and the returned .indices slice allocator: std.mem.Allocator, + file: std.fs.File, + file_id: FileId, /// inclusive file_offset_start: FileOffset, /// exclusive file_offset_end: FileOffset, - ) error{ InvalidArgument, OffsetsOutOfBounds, OutOfMemory }![]FrameIndex { - const n_indices = try computeNumberofFrameIndices(file_offset_start, file_offset_end); + ) ReadBlockingError!AccountDataHandle { + const frame_refs = try self.frame_manager.getAllIndices( + allocator, + file_id, + file_offset_start, + file_offset_end, + ); + errdefer allocator.free(frame_refs); + errdefer self.frame_manager.getAllIndicesRollback(frame_refs); - if (n_indices > self.frames.len) return error.OffsetsOutOfBounds; + // read into frames without valid data + for (frame_refs, 0..) |*frame_ref, i| { + const contains_valid_data = self.frame_manager + .contains_valid_data[frame_ref.index].load(.acquire); + if (contains_valid_data) continue; - const frame_indices = try allocator.alloc(FrameIndex, n_indices); - for (frame_indices) |*f_idx| f_idx.* = INVALID_FRAME; + const frame_aligned_file_offset: FileOffset = @intCast((i * FRAME_SIZE) + + (file_offset_start - file_offset_start % FRAME_SIZE)); + std.debug.assert(frame_aligned_file_offset % FRAME_SIZE == 0); - // lookup frame mappings - for (0.., frame_indices) |i, *f_idx| { - const file_offset: FileOffset = @intCast( - (i * FRAME_SIZE) + (file_offset_start - file_offset_start % FRAME_SIZE), + const bytes_read = try file.preadAll( + &self.frames[frame_ref.index], + frame_aligned_file_offset, ); - - const key: FileIdFileOffset = .{ - .file_id = file_id, - .file_offset = file_offset, - }; - - const maybe_frame_idx = blk: { - const frame_map, var frame_map_lg = self.frame_map_rw.readWithLock(); - defer frame_map_lg.unlock(); - break :blk frame_map.get(key); - }; - - if (maybe_frame_idx) |frame_idx| f_idx.* = frame_idx; - } - - return frame_indices; - } - - /// On a "new" frame (i.e. freshly read into), set all of its associated metadata - /// TODO: atomics - fn overwriteDeadFrameInfo( - self: *BufferPool, - f_idx: FrameIndex, - file_id: FileId, - frame_aligned_file_offset: FileOffset, - size: FrameOffset, - ) error{CannotOverwriteAliveInfo}!void { - try self.overwriteDeadFrameInfoNoSize(f_idx, file_id, frame_aligned_file_offset); - self.frames_metadata.size[f_idx] = size; - } - - /// Useful if you don't currently know the size. - /// make sure to set the size later (!) - /// TODO: atomics - fn overwriteDeadFrameInfoNoSize( - self: *BufferPool, - f_idx: FrameIndex, - file_id: FileId, - frame_aligned_file_offset: FileOffset, - ) error{CannotOverwriteAliveInfo}!void { - std.debug.assert(frame_aligned_file_offset % FRAME_SIZE == 0); - - if (self.frames_metadata.rc[f_idx].isAlive()) { - // not-found indices should always have 0 active readers - return error.CannotOverwriteAliveInfo; - } - - self.frames_metadata.freq[f_idx] = 0; - self.frames_metadata.in_queue[f_idx] = .none; - self.frames_metadata.rc[f_idx].reset(); - - self.frames_metadata.key[f_idx] = .{ - .file_id = file_id, - .file_offset = frame_aligned_file_offset, - }; - - const key: FileIdFileOffset = .{ - .file_id = file_id, - .file_offset = frame_aligned_file_offset, - }; - - { - const frame_map, var frame_map_lg = self.frame_map_rw.writeWithLock(); - defer frame_map_lg.unlock(); - frame_map.putAssumeCapacityNoClobber(key, f_idx); + std.debug.assert(bytes_read <= FRAME_SIZE); + self.frame_manager.contains_valid_data[frame_ref.index].store(true, .seq_cst); } - } - - /// Frames with an associated rc of 0 are up for eviction, and which frames - /// are evicted first is up to the LFU. - fn evictUnusedFrame(self: *BufferPool) error{CannotResetAlive}!void { - const evicted = self.eviction_lfu.evict(self.frames_metadata); - self.free_list.appendAssumeCapacity(evicted); - const did_remove = blk: { - const frame_map, var frame_map_lg = self.frame_map_rw.writeWithLock(); - defer frame_map_lg.unlock(); - break :blk frame_map.remove(self.frames_metadata.key[evicted]); - }; - if (!did_remove) { - std.debug.panic( - "evicted a frame that did not exist in frame_map, frame: {}\n", - .{evicted}, - ); - } - @memset(&self.frames[evicted], 0xAA); - try self.frames_metadata.resetFrame(evicted); + return AccountDataHandle.initBufferPoolRead( + self, + frame_refs, + @intCast(file_offset_start % FRAME_SIZE), + @intCast(((file_offset_end - 1) % FRAME_SIZE) + 1), + ); } pub fn read( @@ -267,9 +194,9 @@ pub const BufferPool = struct { file_offset_start: FileOffset, /// exclusive file_offset_end: FileOffset, - ) !CachedRead { - return if (use_io_uring) - self.readIoUringSubmitAndWait( + ) ReadError!AccountDataHandle { + const handle = try if (USE_IO_URING) + self.readIoUring( allocator, file, file_id, @@ -284,9 +211,26 @@ pub const BufferPool = struct { file_offset_start, file_offset_end, ); + + return handle; } - fn readIoUringSubmitAndWait( + pub fn computeNumberOfFrameIndices( + /// inclusive + file_offset_start: FileOffset, + /// exclusive + file_offset_end: FileOffset, + ) error{InvalidArgument}!u32 { + if (file_offset_start > file_offset_end) return error.InvalidArgument; + if (file_offset_start == file_offset_end) return 0; + + const starting_frame = file_offset_start / FRAME_SIZE; + const ending_frame = (file_offset_end - 1) / FRAME_SIZE; + + return ending_frame - starting_frame + 1; + } + + fn readIoUring( self: *BufferPool, /// used for temp allocations, and the returned .indices slice allocator: std.mem.Allocator, @@ -296,247 +240,306 @@ pub const BufferPool = struct { file_offset_start: FileOffset, /// exclusive file_offset_end: FileOffset, - ) !CachedRead { - if (!use_io_uring) @compileError("io_uring disabled"); + ) ReadIoUringError!AccountDataHandle { + if (!USE_IO_URING) @compileError("io_uring disabled"); + const threadlocal_io_uring = try ioUring(); - const frame_indices = try self.computeFrameIndices( - file_id, + const frame_refs = try self.frame_manager.getAllIndices( allocator, + file_id, file_offset_start, file_offset_end, ); - errdefer allocator.free(frame_indices); + errdefer allocator.free(frame_refs); + errdefer self.frame_manager.getAllIndicesRollback(frame_refs); + + // read into frames without valid data + var i: u32 = 0; + var n_read: u32 = 0; + while (i < frame_refs.len or n_read < frame_refs.len) { + var queue_full = false; + + if (i < frame_refs.len) { + const frame_ref = &frame_refs[i]; + + const contains_valid_data = self.frame_manager.contains_valid_data[ + frame_ref.index + ].load(.acquire); + if (contains_valid_data) { + n_read += 1; + i += 1; + continue; + } - // update found frames in the LFU (we don't want to evict these in the next loop) - var n_invalid_indices: u32 = 0; - for (frame_indices) |f_idx| { - if (f_idx == INVALID_FRAME) { - n_invalid_indices += 1; - continue; + const frame_aligned_file_offset: FileOffset = @intCast((i * FRAME_SIZE) + + (file_offset_start - file_offset_start % FRAME_SIZE)); + std.debug.assert(frame_aligned_file_offset % FRAME_SIZE == 0); + + _ = threadlocal_io_uring.read( + frame_ref.index, + file.handle, + .{ .buffer = &self.frames[frame_ref.index] }, + frame_aligned_file_offset, + ) catch |err| switch (err) { + error.SubmissionQueueFull => { + queue_full = true; + }, + else => return err, + }; + + if (!queue_full) { + i += 1; + continue; + } } - try self.eviction_lfu.insert(self.frames_metadata, f_idx); - if (!self.frames_metadata.rc[f_idx].acquire()) { - // frame has no handles, but memory is still valid - self.frames_metadata.rc[f_idx].reset(); + + if (queue_full or i >= frame_refs.len) { + // Submit without blocking + const n_submitted = try threadlocal_io_uring.submit(); + std.debug.assert(n_submitted <= IO_URING_ENTRIES); + if (queue_full) std.debug.assert(n_submitted == IO_URING_ENTRIES); + + // Read whatever is still available + var cqe_buf: [IO_URING_ENTRIES]std.os.linux.io_uring_cqe = undefined; + const n_cqes_copied = try threadlocal_io_uring.copy_cqes(&cqe_buf, 0); + for (cqe_buf[0..n_cqes_copied]) |cqe| { + switch (cqe.err()) { + .SUCCESS => {}, + .BADF => return error.NotOpenForReading, // Can be a race condition. + .IO => return error.InputOutput, + .ISDIR => return error.IsDir, + else => |err| return std.posix.unexpectedErrno(err), + } + + const bytes_read: FrameOffset = @intCast(cqe.res); + std.debug.assert(bytes_read > 0); + } + + for (frame_refs[n_read..][0..n_cqes_copied]) |*frame_ref| { + self.frame_manager.contains_valid_data[frame_ref.index].store(true, .seq_cst); + } + n_read += n_cqes_copied; } } - // fill in invalid frames with file data, replacing invalid frames with - // fresh ones. - for (0.., frame_indices) |i, *f_idx| { - if (f_idx.* != INVALID_FRAME) continue; - // INVALID_FRAME => not found, read fresh and populate + return AccountDataHandle.initBufferPoolRead( + self, + frame_refs, + @intCast(file_offset_start % FRAME_SIZE), + @intCast(((file_offset_end - 1) % FRAME_SIZE) + 1), + ); + } +}; - const frame_aligned_file_offset: FileOffset = @intCast((i * FRAME_SIZE) + - (file_offset_start - file_offset_start % FRAME_SIZE)); - std.debug.assert(frame_aligned_file_offset % FRAME_SIZE == 0); +/// Keeps track of all of the data and lifetimes associated with frames. +pub const FrameManager = struct { + /// Uniquely identifies a frame from its file_id and offset. + /// Used for looking up valid frames. + frame_map_rw: sig.sync.RwMux(Map), - f_idx.* = blk: while (true) { - if (self.free_list.popOrNull()) |free_idx| { - break :blk free_idx; - } else { - try self.evictUnusedFrame(); - } + /// Evicts unused frames for reuse. + eviction_lfu: sig.sync.RwMux(HierarchicalFIFO), + + /// Per-frame refcounts. Used to track what frames still have handles associated with them. + frame_ref_counts: []Atomic(u32), + + contains_valid_data: []std.atomic.Value(bool), + + pub const Map = std.AutoHashMapUnmanaged(FileIdFileOffset, FrameIndex); + + const GetError = error{ InvalidArgument, OffsetsOutOfBounds, OutOfMemory }; + + pub fn init(allocator: std.mem.Allocator, num_frames: u32) error{OutOfMemory}!FrameManager { + std.debug.assert(num_frames > 0); + + var frame_map: Map = .{}; + try frame_map.ensureTotalCapacity(allocator, num_frames * 2); + errdefer frame_map.deinit(allocator); + + var eviction_lfu = HierarchicalFIFO.init( + allocator, + @max(num_frames / 10, 1), + num_frames, + ) catch |err| switch (err) { + error.InvalidArgument => unreachable, + error.OutOfMemory => return error.OutOfMemory, + }; + errdefer eviction_lfu.deinit(allocator); + + for (0..num_frames) |i| { + const f_idx: FrameIndex = @intCast(i); + // initially populate so that we can start evicting + eviction_lfu.insert(f_idx); + + const bad_key = FileIdFileOffset{ + .file_id = FileId.fromInt(std.math.maxInt(u32)), + .file_offset = f_idx * 2 + 1, // always odd => always invalid }; - _ = try self.io_uring.read( - f_idx.*, - file.handle, - .{ .buffer = &self.frames[f_idx.*] }, - frame_aligned_file_offset, - ); - try self.overwriteDeadFrameInfoNoSize(f_idx.*, file_id, frame_aligned_file_offset); - try self.eviction_lfu.insert(self.frames_metadata, f_idx.*); + frame_map.putAssumeCapacityNoClobber(bad_key, f_idx); + eviction_lfu.key[f_idx] = bad_key; } - // Wait for our file reads to complete, filling the read length into the metadata as we go. - // (This read length will almost always be FRAME_SIZE, however it will likely be less than - // that at the end of the file) - if (n_invalid_indices > 0) { - const n_submitted = try self.io_uring.submit_and_wait(n_invalid_indices); - std.debug.assert(n_submitted == n_invalid_indices); // did smthng else submit an event? - - // would be nice to get rid of this alloc - const cqes = try allocator.alloc(std.os.linux.io_uring_cqe, n_submitted); - defer allocator.free(cqes); - - // check our completions in order to set the frame's size; - // we need to wait for completion to get the bytes read - const cqe_count = try self.io_uring.copy_cqes(cqes, n_submitted); - std.debug.assert(cqe_count == n_submitted); // why did we not receive them all? - for (0.., cqes) |i, cqe| { - if (cqe.err() != .SUCCESS) { - std.debug.panic("cqe err: {}, i: {}", .{ cqe, i }); - } - const f_idx = cqe.user_data; - const bytes_read: FrameOffset = @intCast(cqe.res); - std.debug.assert(bytes_read <= FRAME_SIZE); + const frame_ref_counts = try allocator.alloc(Atomic(u32), num_frames); + errdefer allocator.free(frame_ref_counts); + @memset(frame_ref_counts, .{ .raw = 0 }); + + const contains_valid_data = try allocator.alloc(std.atomic.Value(bool), num_frames); + errdefer allocator.free(contains_valid_data); + @memset(contains_valid_data, std.atomic.Value(bool).init(false)); + + return .{ + .frame_map_rw = sig.sync.RwMux(Map).init(frame_map), + .eviction_lfu = sig.sync.RwMux(HierarchicalFIFO).init(eviction_lfu), + .frame_ref_counts = frame_ref_counts, + .contains_valid_data = contains_valid_data, + }; + } - // TODO: atomics - self.frames_metadata.size[f_idx] = bytes_read; + pub fn deinit(self: *FrameManager, allocator: std.mem.Allocator) void { + const eviction_lfu, var eviction_lfu_lg = self.eviction_lfu.writeWithLock(); + eviction_lfu.deinit(allocator); + eviction_lfu_lg.unlock(); + + const frame_map, var frame_map_lg = self.frame_map_rw.writeWithLock(); + frame_map.deinit(allocator); + frame_map_lg.unlock(); + + for (self.frame_ref_counts, 0..) |*frame_ref_count, i| { + if (frame_ref_count.load(.seq_cst) > 0) { + std.debug.panicExtra( + null, + @returnAddress(), + "BufferPool deinitialised with alive handle: {}\n", + .{i}, + ); } } + allocator.free(self.frame_ref_counts); + allocator.free(self.contains_valid_data); + } - return CachedRead{ - .buffer_pool = self, - .frame_indices = frame_indices, - .first_frame_start_offset = @intCast(file_offset_start % FRAME_SIZE), - .last_frame_end_offset = @intCast(((file_offset_end - 1) % FRAME_SIZE) + 1), - }; + fn numFrames(self: *const FrameManager) u32 { + return @intCast(self.frame_ref_counts.len); } - fn readBlocking( - self: *BufferPool, - /// used for temp allocations, and the returned .indices slice + fn getAllIndices( + self: *FrameManager, allocator: std.mem.Allocator, - file: std.fs.File, file_id: FileId, - /// inclusive file_offset_start: FileOffset, - /// exclusive file_offset_end: FileOffset, - ) (error{ - InvalidArgument, - OutOfMemory, - InvalidKey, - CannotResetAlive, - CannotOverwriteAliveInfo, - OffsetsOutOfBounds, - } || std.posix.PReadError)!CachedRead { - const frame_indices = try self.computeFrameIndices( - file_id, - allocator, + ) GetError![]FrameRef { + const n_indices = try BufferPool.computeNumberOfFrameIndices( file_offset_start, file_offset_end, ); - errdefer allocator.free(frame_indices); - - // update found frames in the LFU (we don't want to evict these in the next loop) - for (frame_indices) |f_idx| { - if (f_idx == INVALID_FRAME) continue; - try self.eviction_lfu.insert(self.frames_metadata, f_idx); - if (!self.frames_metadata.rc[f_idx].acquire()) { - // frame has no handles, but memory is still valid - self.frames_metadata.rc[f_idx].reset(); - } - } + if (n_indices > self.numFrames()) return error.OffsetsOutOfBounds; - // fill in invalid frames with file data, replacing invalid frames with - // fresh ones. - for (0.., frame_indices) |i, *f_idx| { - if (f_idx.* != INVALID_FRAME) continue; - // INVALID_FRAME => not found, read fresh and populate + const frame_refs = try allocator.alloc(FrameRef, n_indices); + @memset(frame_refs, FrameRef.INIT); - const frame_aligned_file_offset: FileOffset = @intCast((i * FRAME_SIZE) + - (file_offset_start - file_offset_start % FRAME_SIZE)); - std.debug.assert(frame_aligned_file_offset % FRAME_SIZE == 0); + if (n_indices == 0) return &.{}; - f_idx.* = blk: while (true) { - if (self.free_list.popOrNull()) |free_idx| { - break :blk free_idx; - } else { - try self.evictUnusedFrame(); - } - }; + errdefer comptime unreachable; // We don't error after this point - const bytes_read = try file.pread(&self.frames[f_idx.*], frame_aligned_file_offset); - try self.overwriteDeadFrameInfo( - f_idx.*, - file_id, - frame_aligned_file_offset, - @intCast(bytes_read), - ); - try self.eviction_lfu.insert(self.frames_metadata, f_idx.*); - } + // lookup frame mappings + var n_hits: u32 = 0; + { + const frame_map, var frame_map_lg = self.frame_map_rw.readWithLock(); + defer frame_map_lg.unlock(); - return CachedRead{ - .buffer_pool = self, - .frame_indices = frame_indices, - .first_frame_start_offset = @intCast(file_offset_start % FRAME_SIZE), - .last_frame_end_offset = @intCast(((file_offset_end - 1) % FRAME_SIZE) + 1), - }; - } -}; + for (frame_refs, 0..) |*frame_ref, i| { + const file_offset: FileOffset = @intCast( + (i * FRAME_SIZE) + (file_offset_start - file_offset_start % FRAME_SIZE), + ); + + const key: FileIdFileOffset = .{ + .file_id = file_id, + .file_offset = file_offset, + }; + + const cache_hit_f_idx = frame_map.get(key) orelse continue; + n_hits += 1; + frame_ref.found_in_cache = true; + frame_ref.index = cache_hit_f_idx; + _ = self.frame_ref_counts[frame_ref.index].fetchAdd(1, .seq_cst); + } + } -/// TODO: atomics on all index accesses -pub const FramesMetadata = struct { - pub const InQueue = enum(u2) { none, small, main, ghost }; + if (n_hits < frame_refs.len) { + const frame_map, var frame_map_lg = self.frame_map_rw.writeWithLock(); + defer frame_map_lg.unlock(); - /// ref count for the frame. For frames that are currently being used elsewhere. - rc: []sig.sync.ReferenceCounter, + const eviction_lfu, var eviction_lfu_lg = self.eviction_lfu.writeWithLock(); + defer eviction_lfu_lg.unlock(); - /// effectively the inverse of BufferPool.FrameMap, used in order to - /// evict keys by their value - key: []FileIdFileOffset, + for (frame_refs, 0..) |*frame_ref, i| { + if (frame_ref.found_in_cache) { + std.debug.assert(frame_ref.index != INVALID_FRAME); + continue; + } + std.debug.assert(frame_ref.index == INVALID_FRAME); // missed frame with valid idx? + + const file_offset: FileOffset = @intCast( + (i * FRAME_SIZE) + (file_offset_start - file_offset_start % FRAME_SIZE), + ); + + const key: FileIdFileOffset = .{ + .file_id = file_id, + .file_offset = file_offset, + }; + + // Couldve been added between readLock() and us. If so, just ref it. + // If not, upsert to value-index that we'll get from eviction below. + const entry = frame_map.getOrPutAssumeCapacity(key); + if (entry.found_existing) { + frame_ref.index = entry.value_ptr.*; + _ = self.frame_ref_counts[frame_ref.index].fetchAdd(1, .seq_cst); + continue; + } - /// frequency for the S3_FIFO - /// Yes, really, only 0, 1, 2, 3. - freq: []u2, + const evicted_f_idx = eviction_lfu.evict(self.frame_ref_counts); - /// which S3_FIFO queue this frame exists in - in_queue: []InQueue, + const evicted_key = eviction_lfu.key[evicted_f_idx]; - /// 0..=512 - size: []FrameOffset, + frame_ref.index = evicted_f_idx; + self.frame_ref_counts[frame_ref.index].store(1, .seq_cst); + self.contains_valid_data[frame_ref.index].store(false, .seq_cst); - fn init(allocator: std.mem.Allocator, num_frames: usize) !FramesMetadata { - const rc = try allocator.alignedAlloc( - sig.sync.ReferenceCounter, - std.mem.page_size, - num_frames, - ); - errdefer allocator.free(rc); - @memset(rc, .{ .state = .{ .raw = 0 } }); + eviction_lfu.insert(frame_ref.index); - const key = try allocator.alignedAlloc(FileIdFileOffset, std.mem.page_size, num_frames); - errdefer allocator.free(key); - @memset(key, .{ .file_id = FileId.fromInt(0), .file_offset = 0 }); + entry.value_ptr.* = frame_ref.index; + eviction_lfu.key[frame_ref.index] = key; - const freq = try allocator.alignedAlloc(u2, std.mem.page_size, num_frames); - errdefer allocator.free(freq); - @memset(freq, 0); + std.debug.assert(!std.meta.eql(evicted_key, key)); // inserted key we just evicted - const in_queue = try allocator.alignedAlloc(InQueue, std.mem.page_size, num_frames); - errdefer allocator.free(in_queue); - @memset(in_queue, .none); + const removed = frame_map.remove(evicted_key); + std.debug.assert(removed); // evicted key was not in map + } + } - const size = try allocator.alignedAlloc(FrameOffset, std.mem.page_size, num_frames); - errdefer allocator.free(size); - @memset(size, 0); + for (frame_refs) |frame_ref| std.debug.assert(frame_ref.index != INVALID_FRAME); - return .{ - .rc = rc, - .key = key, - .freq = freq, - .in_queue = in_queue, - .size = size, - }; + return frame_refs; } - fn deinit(self: *FramesMetadata, allocator: std.mem.Allocator) void { - // NOTE: this check itself is racy, but should never happen - for (self.rc) |*rc| { - if (rc.isAlive()) { - @panic("BufferPool deinitialised with alive handles"); + /// To be called on read failure. This should be extremely rare. + fn getAllIndicesRollback(self: *FrameManager, frame_refs: []FrameRef) void { + @setCold(true); + + // rollback rcs + for (frame_refs) |f_ref| _ = self.frame_ref_counts[f_ref.index].fetchSub(1, .seq_cst); + + // re-insert evicted indices + { + const eviction_lfu, var eviction_lfu_lg = self.eviction_lfu.writeWithLock(); + defer eviction_lfu_lg.unlock(); + + for (frame_refs) |f_ref| { + if (f_ref.found_in_cache == false) eviction_lfu.insert(f_ref.index); } } - allocator.free(self.rc); - allocator.free(self.key); - allocator.free(self.freq); - allocator.free(self.in_queue); - allocator.free(self.size); - self.* = undefined; - } - - // to be called on the eviction of a frame - // should never be called on a frame with rc>0 - // TODO: this should *all* be atomic (!) - fn resetFrame(self: FramesMetadata, index: FrameIndex) error{CannotResetAlive}!void { - if (self.rc[index].isAlive()) return error.CannotResetAlive; - self.freq[index] = 0; - self.in_queue[index] = .none; - self.size[index] = 0; - self.key[index] = FileIdFileOffset.INVALID; } }; @@ -556,13 +559,25 @@ pub const FramesMetadata = struct { /// eviction when no free frames can be made available is illegal behaviour. pub const HierarchicalFIFO = struct { pub const Key = FrameIndex; - pub const Metadata = FramesMetadata; - pub const Fifo = std.fifo.LinearFifo(FrameIndex, .Slice); // TODO: atomics + pub const Fifo = std.fifo.LinearFifo(FrameIndex, .Slice); + + pub const InQueue = enum(u8) { none, small, main, ghost }; // u8 required for extern usage small: Fifo, main: Fifo, // probably-alive items ghost: Fifo, // probably-dead items + /// Effectively the inverse of FrameManager.Map, used in order to remove Map entries by their + /// value when their key is evicted. + key: []FileIdFileOffset, + + /// Frequency for the HierarchicalFIFO entries. + /// Yes, really, only 0, 1, 2, 3. + freq: []u2, + + /// Which queue each frame exists in. + in_queue: []InQueue, + pub fn init( allocator: std.mem.Allocator, small_size: u32, @@ -579,10 +594,25 @@ pub const HierarchicalFIFO = struct { const ghost_buf = try allocator.alloc(FrameIndex, num_frames); errdefer allocator.free(ghost_buf); + const key = try allocator.alloc(FileIdFileOffset, num_frames); + errdefer allocator.free(key); + @memset(key, .{ .file_id = FileId.fromInt(0), .file_offset = 0 }); + + const freq = try allocator.alloc(u2, num_frames); + errdefer allocator.free(freq); + @memset(freq, 0); + + const in_queue = try allocator.alloc(InQueue, num_frames); + errdefer allocator.free(in_queue); + @memset(in_queue, .none); + return .{ .small = Fifo.init(small_buf), .main = Fifo.init(main_buf), .ghost = Fifo.init(ghost_buf), + .key = key, + .freq = freq, + .in_queue = in_queue, }; } @@ -590,6 +620,11 @@ pub const HierarchicalFIFO = struct { allocator.free(self.small.buf); allocator.free(self.main.buf); allocator.free(self.ghost.buf); + + allocator.free(self.key); + allocator.free(self.freq); + allocator.free(self.in_queue); + self.* = undefined; } @@ -597,39 +632,33 @@ pub const HierarchicalFIFO = struct { return @intCast(self.main.buf.len); } - pub fn insert( - self: *HierarchicalFIFO, - metadata: Metadata, - key: Key, - ) error{InvalidKey}!void { - if (key == INVALID_FRAME) return error.InvalidKey; - - switch (metadata.in_queue[key]) { + pub fn insert(self: *HierarchicalFIFO, key: Key) void { + switch (self.in_queue[key]) { .main, .small => { - metadata.freq[key] +|= 1; + self.freq[key] +|= 1; }, .ghost => { - std.debug.assert(metadata.freq[key] == 0); - metadata.freq[key] = 1; + std.debug.assert(self.freq[key] == 0); + self.freq[key] = 1; // Add key to main too - important to note that the key *still* // exists within ghost, but from now on we'll ignore that entry. self.main.writeItemAssumeCapacity(key); - metadata.in_queue[key] = .main; + self.in_queue[key] = .main; }, .none => { if (self.small.writableLength() == 0) { const popped_small = self.small.readItem().?; - if (metadata.freq[popped_small] == 0) { + if (self.freq[popped_small] == 0) { self.ghost.writeItemAssumeCapacity(popped_small); - metadata.in_queue[popped_small] = .ghost; + self.in_queue[popped_small] = .ghost; } else { self.main.writeItemAssumeCapacity(popped_small); - metadata.in_queue[popped_small] = .main; + self.in_queue[popped_small] = .main; } } self.small.writeItemAssumeCapacity(key); - metadata.in_queue[key] = .small; + self.in_queue[key] = .small; }, } } @@ -638,24 +667,24 @@ pub const HierarchicalFIFO = struct { /// This does not return an optional, as the caller *requires* a key to be /// evicted. Not being able to return a key means illegal internal state in /// the BufferPool. - pub fn evict(self: *HierarchicalFIFO, metadata: Metadata) Key { + pub fn evict(self: *HierarchicalFIFO, frame_ref_counts: []const Atomic(u32)) Key { var alive_eviction_attempts: usize = 0; const dead_key: Key = while (true) { var maybe_evicted: ?Key = null; - if (maybe_evicted == null) maybe_evicted = self.evictGhost(metadata); + if (maybe_evicted == null) maybe_evicted = self.evictGhost(); // if we keep failing to evict a dead key, start alternating between // evicting from main and small. This saves us from the rare case // that every key in main is alive. In normal conditions, main // should be evicted from first. if (alive_eviction_attempts < 10 or alive_eviction_attempts % 2 == 0) { - if (maybe_evicted == null) maybe_evicted = self.evictSmallOrMain(metadata, .main); - if (maybe_evicted == null) maybe_evicted = self.evictSmallOrMain(metadata, .small); + if (maybe_evicted == null) maybe_evicted = self.evictSmallOrMain(.main); + if (maybe_evicted == null) maybe_evicted = self.evictSmallOrMain(.small); } else { - if (maybe_evicted == null) maybe_evicted = self.evictSmallOrMain(metadata, .small); - if (maybe_evicted == null) maybe_evicted = self.evictSmallOrMain(metadata, .main); + if (maybe_evicted == null) maybe_evicted = self.evictSmallOrMain(.small); + if (maybe_evicted == null) maybe_evicted = self.evictSmallOrMain(.main); } // NOTE: This panic is effectively unreachable - an empty cache @@ -666,25 +695,25 @@ pub const HierarchicalFIFO = struct { @panic("unable to evict: cache empty"); // see above comment // alive evicted keys are reinserted, we try again - if (metadata.rc[evicted].isAlive()) { - metadata.freq[evicted] = 1; + if (frame_ref_counts[evicted].load(.seq_cst) > 0) { + self.freq[evicted] = 1; self.main.writeItemAssumeCapacity(evicted); - metadata.in_queue[evicted] = .main; + self.in_queue[evicted] = .main; alive_eviction_attempts += 1; continue; } // key is definitely dead - metadata.in_queue[evicted] = .none; + self.in_queue[evicted] = .none; break evicted; }; return dead_key; } - fn evictGhost(self: *HierarchicalFIFO, metadata: Metadata) ?Key { + fn evictGhost(self: *HierarchicalFIFO) ?Key { const evicted: ?Key = while (self.ghost.readItem()) |ghost_key| { - switch (metadata.in_queue[ghost_key]) { + switch (self.in_queue[ghost_key]) { .ghost => { break ghost_key; }, @@ -707,7 +736,6 @@ pub const HierarchicalFIFO = struct { fn evictSmallOrMain( self: *HierarchicalFIFO, - metadata: Metadata, comptime target_queue: enum { small, main }, ) ?Key { const queue = switch (target_queue) { @@ -717,14 +745,14 @@ pub const HierarchicalFIFO = struct { const evicted: ?Key = while (queue.readItem()) |popped_key| { switch (target_queue) { - .small => if (metadata.in_queue[popped_key] != .small) unreachable, - .main => if (metadata.in_queue[popped_key] != .main) unreachable, + .small => if (self.in_queue[popped_key] != .small) unreachable, + .main => if (self.in_queue[popped_key] != .main) unreachable, } - if (metadata.freq[popped_key] == 0) { + if (self.freq[popped_key] == 0) { break popped_key; } else { - metadata.freq[popped_key] -|= 1; + self.freq[popped_key] -= 1; queue.writeItemAssumeCapacity(popped_key); } } else null; @@ -735,190 +763,370 @@ pub const HierarchicalFIFO = struct { /// slice-like datatype /// view over one or more buffers owned by the BufferPool -pub const CachedRead = struct { - buffer_pool: *BufferPool, - frame_indices: []const FrameIndex, - /// inclusive, the offset into the first frame - first_frame_start_offset: FrameOffset, - /// exclusive, the offset into the last frame - last_frame_end_offset: FrameOffset, +pub const AccountDataHandle = union(enum) { + /// Data owned by BufferPool, returned by .read() - do not construct this yourself (!) + buffer_pool_read: BufferPoolRead, + + /// Data allocated elsewhere, not owned or created by BufferPool. BufferPool will deallocate. + owned_allocation: []const u8, + /// Data allocated elsewhere, not owned or created by BufferPool. + unowned_allocation: []const u8, + /// Data owned by parent AccountDataHandle + sub_read: SubRead, + /// Used in place of a read, in callsites where it is not actually needed. Provides .len(). + empty: Empty, + + const BufferPoolRead = struct { + buffer_pool: *BufferPool, + frame_refs: []FrameRef, + /// inclusive, the offset into the first frame + first_frame_start_offset: FrameOffset, + /// exclusive, the offset into the last frame + last_frame_end_offset: FrameOffset, + }; - pub const Iterator = struct { - cached_read: *const CachedRead, - bytes_read: u32 = 0, + const SubRead = packed struct(u128) { + parent: *const AccountDataHandle, + // offset into the parent's read + start: u32, + end: u32, + }; - const Reader = std.io.GenericReader(*Iterator, error{}, readBytes); + const Empty = struct { + len: u32, + }; - pub fn next(self: *Iterator) ?u8 { - if (self.bytes_read == self.cached_read.len()) { - return null; - } - defer self.bytes_read += 1; - return self.cached_read.readByte(self.bytes_read); - } + pub const @"!bincode-config" = bincode.FieldConfig(AccountDataHandle){ + .deserializer = bincodeDeserialize, + .serializer = bincodeSerialize, + .free = bincodeFree, + }; + + /// Only called by the BufferPool + fn initBufferPoolRead( + buffer_pool: *BufferPool, + frame_refs: []FrameRef, + first_frame_start_offset: FrameOffset, + last_frame_end_offset: FrameOffset, + ) AccountDataHandle { + return .{ + .buffer_pool_read = .{ + .buffer_pool = buffer_pool, + .frame_refs = frame_refs, + .first_frame_start_offset = first_frame_start_offset, + .last_frame_end_offset = last_frame_end_offset, + }, + }; + } - pub fn reset(self: *Iterator) void { - self.bytes_read = 0; + /// External to the BufferPool, data will be freed upon .deinit + pub fn initAllocatedOwned(data: []const u8) AccountDataHandle { + return AccountDataHandle{ .owned_allocation = data }; + } + + /// External to the BufferPool + pub fn initAllocated(data: []const u8) AccountDataHandle { + return AccountDataHandle{ .unowned_allocation = data }; + } + + pub fn initEmpty(length: u32) AccountDataHandle { + return .{ .empty = .{ .len = length } }; + } + + pub fn deinit(self: AccountDataHandle, allocator: std.mem.Allocator) void { + switch (self) { + .buffer_pool_read => |*buffer_pool_read| { + for (buffer_pool_read.frame_refs) |frame_ref| { + std.debug.assert(frame_ref.index != INVALID_FRAME); + const prev_rc = buffer_pool_read.buffer_pool.frame_manager.frame_ref_counts[ + frame_ref.index + ].fetchSub(1, .seq_cst); + std.debug.assert(prev_rc != 0); // deinit of dead frame + } + + allocator.free(buffer_pool_read.frame_refs); + }, + .owned_allocation => |owned_allocation| { + allocator.free(owned_allocation); + }, + .sub_read, + .unowned_allocation, + .empty, + => {}, } + } - pub fn readBytes(self: *Iterator, buffer: []u8) error{}!usize { - var i: u32 = 0; - while (i < buffer.len) : (i += 1) { - buffer[i] = self.next() orelse break; - } - return i; + pub fn iterator(self: *const AccountDataHandle) Iterator { + return .{ .read_handle = self, .start = 0, .end = self.len() }; + } + + /// Copies all data into specified buffer. Buf.len === self.len() + pub fn readAll(self: AccountDataHandle, buf: []u8) void { + std.debug.assert(buf.len == self.len()); + self.read(0, buf); + } + + pub fn readAllAllocate(self: AccountDataHandle, allocator: std.mem.Allocator) ![]u8 { + return self.readAllocate(allocator, 0, self.len()); + } + + /// Copies data into specified buffer. + pub fn read( + self: *const AccountDataHandle, + start: FileOffset, + buf: []u8, + ) void { + const end: FileOffset = @intCast(start + buf.len); + + switch (self.*) { + .owned_allocation, .unowned_allocation => |data| return @memcpy(buf, data[start..end]), + .sub_read => |*sb| return sb.parent.read(sb.start + start, buf), + .empty => unreachable, + .buffer_pool_read => {}, } - pub fn reader(self: *Iterator) Reader { - return .{ .context = self }; + var bytes_copied: u32 = 0; + var iter = self.iteratorRanged(start, end); + while (iter.nextFrame()) |frame_slice| { + const copy_len = @min(frame_slice.len, buf.len - bytes_copied); + @memcpy( + buf[bytes_copied..][0..copy_len], + frame_slice[0..copy_len], + ); + bytes_copied += @intCast(copy_len); } - }; + } - pub fn readByte(self: CachedRead, index: usize) u8 { - std.debug.assert(self.frame_indices.len != 0); - std.debug.assert(index < self.len()); - const offset = index + self.first_frame_start_offset; - std.debug.assert(offset >= self.first_frame_start_offset); + pub fn readAllocate( + self: AccountDataHandle, + allocator: std.mem.Allocator, + start: FileOffset, + end: FileOffset, + ) ![]u8 { + const buf = try allocator.alloc(u8, end - start); + self.read(start, buf); + return buf; + } - return self.buffer_pool.frames[ - self.frame_indices[offset / FRAME_SIZE] - ][offset % FRAME_SIZE]; + pub fn len(self: AccountDataHandle) u32 { + return switch (self) { + .sub_read => |sr| sr.end - sr.start, + .buffer_pool_read => |buffer_pool_read| { + if (buffer_pool_read.frame_refs.len == 0) return 0; + return (@as(u32, @intCast(buffer_pool_read.frame_refs.len)) - 1) * + FRAME_SIZE + + buffer_pool_read.last_frame_end_offset - + buffer_pool_read.first_frame_start_offset; + }, + .empty => |empty| empty.len, + .owned_allocation, .unowned_allocation => |data| @intCast(data.len), + }; } - /// Copies entire read into specified buffer. Must be correct length. - pub fn readAll( - self: CachedRead, - buf: []u8, - ) error{InvalidArgument}!void { - if (buf.len != self.len()) return error.InvalidArgument; - var bytes_copied: usize = 0; - - for (0.., self.frame_indices) |i, f_idx| { - const is_first_frame: u2 = @intFromBool(i == 0); - const is_last_frame: u2 = @intFromBool(i == self.frame_indices.len - 1); - - switch (is_first_frame << 1 | is_last_frame) { - 0b00 => { // !first, !last (middle frame) - const read_len = FRAME_SIZE; - @memcpy( - buf[bytes_copied..][0..read_len], - &self.buffer_pool.frames[f_idx], - ); - bytes_copied += read_len; - }, - 0b10 => { // first, !last (first frame) - std.debug.assert(i == 0); - const read_len = FRAME_SIZE - self.first_frame_start_offset; - @memcpy( - buf[0..read_len], - self.buffer_pool.frames[f_idx][self.first_frame_start_offset..], - ); - bytes_copied += read_len; - }, - 0b01 => { // !first, last (last frame) - const read_len = self.last_frame_end_offset; - @memcpy( - buf[bytes_copied..][0..read_len], - self.buffer_pool.frames[f_idx][0..read_len], - ); - bytes_copied += read_len; - std.debug.assert(bytes_copied == self.len()); - }, - 0b11 => { // first, last (only frame) - std.debug.assert(self.frame_indices.len == 1); - const readable_len = self.len(); - @memcpy( - buf[0..readable_len], - self.buffer_pool.frames[ - f_idx - ][self.first_frame_start_offset..self.last_frame_end_offset], - ); - bytes_copied += self.len(); - }, - } + pub fn iteratorRanged( + self: *const AccountDataHandle, + start: FileOffset, + end: FileOffset, + ) Iterator { + std.debug.assert(self.len() >= end); + std.debug.assert(end >= start); + + return .{ .read_handle = self, .start = start, .end = end }; + } + + pub fn dupeAllocatedOwned( + self: AccountDataHandle, + allocator: std.mem.Allocator, + ) !AccountDataHandle { + const data_copy = try self.readAllAllocate(allocator); + return initAllocatedOwned(data_copy); + } + + pub fn duplicateBufferPoolRead( + self: AccountDataHandle, + allocator: std.mem.Allocator, + ) !AccountDataHandle { + switch (self) { + .buffer_pool_read => |*buffer_pool_read| { + const refs = try allocator.dupe(FrameRef, buffer_pool_read.frame_refs); + for (refs) |ref| { + const prev_rc = buffer_pool_read.buffer_pool.frame_manager.frame_ref_counts[ + ref.index + ].fetchAdd(1, .seq_cst); + std.debug.assert(prev_rc > 0); // duplicated AccountDataHandle with dead frame + } + return AccountDataHandle.initBufferPoolRead( + buffer_pool_read.buffer_pool, + refs, + buffer_pool_read.first_frame_start_offset, + buffer_pool_read.last_frame_end_offset, + ); + }, + else => unreachable, // duplicateBufferPoolRead called with handle not from BufferPool. } } - pub fn iterator(self: *const CachedRead) Iterator { - return .{ .cached_read = self }; + pub fn slice(self: *const AccountDataHandle, start: usize, end: usize) AccountDataHandle { + return .{ .sub_read = .{ + .end = end, + .start = start, + .self = self, + } }; } - pub fn len(self: CachedRead) u32 { - if (self.frame_indices.len == 0) return 0; - return (@as(u32, @intCast(self.frame_indices.len)) - 1) * - FRAME_SIZE + self.last_frame_end_offset - self.first_frame_start_offset; + /// testing purposes only + pub fn expectEqual(expected: AccountDataHandle, actual: AccountDataHandle) !void { + if (!builtin.is_test) + @compileError("AccountDataHandle.expectEqual is for testing purposes only"); + const expected_buf = try expected.readAllocate(std.testing.allocator, 0, expected.len()); + defer std.testing.allocator.free(expected_buf); + const actual_buf = try actual.readAllocate(std.testing.allocator, 0, actual.len()); + defer std.testing.allocator.free(actual_buf); + try std.testing.expectEqualSlices(u8, expected_buf, actual_buf); } - pub fn deinit(self: CachedRead, allocator: std.mem.Allocator) void { - for (self.frame_indices) |frame_index| { - std.debug.assert(frame_index != INVALID_FRAME); + pub fn eql(h1: AccountDataHandle, h2: AccountDataHandle) bool { + if (std.meta.eql(h1, h2)) return true; + if (h1.len() != h2.len()) return false; - if (self.buffer_pool.frames_metadata.rc[frame_index].release()) { - // notably, the frame remains in memory, and its hashmap entry - // remains valid. - } + var h1_iter = h1.iterator(); + var h2_iter = h2.iterator(); + + while (h1_iter.nextByte()) |h1_byte| { + const h2_byte = h2_iter.nextByte().?; + if (h1_byte != h2_byte) return false; } - allocator.free(self.frame_indices); + + return true; } -}; -/// Used for atomic appends + pops; No guarantees for elements. -/// Methods follow that of ArrayListUnmanaged -pub fn AtomicStack(T: type) type { - return struct { - const Self = @This(); - - buf: [*]T, - len: std.atomic.Value(usize), - cap: usize, // fixed - - fn init(allocator: std.mem.Allocator, cap: usize) !Self { - const buf = try allocator.alloc(T, cap); - return .{ - .buf = buf.ptr, - .len = .{ .raw = 0 }, - .cap = cap, - }; + pub fn eqlSlice(self: AccountDataHandle, data: []const u8) bool { + if (self.len() != data.len) return false; + + var iter = self.iterator(); + var i: u32 = 0; + while (iter.nextFrame()) |frame_slice| : (i += @intCast(frame_slice.len)) { + if (!std.mem.eql(u8, frame_slice, data[i..frame_slice.len])) return false; } - fn deinit(self: *Self, allocator: std.mem.Allocator) void { - allocator.free(self.buf[0..self.cap]); - self.* = undefined; + return true; + } + + pub const Iterator = struct { + read_handle: *const AccountDataHandle, + bytes_read: FileOffset = 0, + start: FileOffset, + end: FileOffset, + + pub const Reader = std.io.GenericReader(*Iterator, error{}, Iterator.readBytes); + + pub fn reader(self: *Iterator) Reader { + return .{ .context = self }; } - // add new item to the end of buf, incrementing self.len atomically - fn appendAssumeCapacity(self: *Self, item: T) void { - const prev_len = self.len.load(.acquire); - std.debug.assert(prev_len < self.cap); - self.buf[prev_len] = item; - _ = self.len.fetchAdd(1, .release); + pub fn len(self: Iterator) FileOffset { + return self.end - self.start; } - // return item at end of buf, decrementing self.len atomically - fn popOrNull(self: *Self) ?T { - const prev_len = self.len.fetchSub(1, .acquire); - if (prev_len == 0) { - _ = self.len.fetchAdd(1, .release); - return null; - } - return self.buf[prev_len - 1]; + pub fn bytesRemaining(self: Iterator) FileOffset { + return self.len() - self.bytes_read; + } + + pub fn readBytes(self: *Iterator, buffer: []u8) error{}!usize { + if (self.bytes_read == self.end) return 0; + + const read_len = @min(self.bytesRemaining(), buffer.len); + + self.read_handle.read(self.start + self.bytes_read, buffer[0..read_len]); + self.bytes_read += @intCast(read_len); + return read_len; + } + + pub fn nextByte(self: *Iterator) ?u8 { + var buf: u8 = undefined; + const buf_len = self.readBytes((&buf)[0..1]) catch unreachable; + if (buf_len > 1) unreachable; + if (buf_len == 0) return null; + return buf; + } + + /// Does not copy, reads buffers of up to FRAME_SIZE at a time. + pub fn nextFrame(self: *Iterator) ?[]const u8 { + if (self.bytesRemaining() == 0) return null; + + const first_frame_offset: FrameIndex = switch (self.read_handle.*) { + .buffer_pool_read => |*buffer_pool_read| buffer_pool_read.first_frame_start_offset, + else => 0, + }; + + // an index we can use with the bufferpool directly + const read_offset: FileOffset = self.start + first_frame_offset + self.bytes_read; + + const frame_buf = switch (self.read_handle.*) { + .buffer_pool_read => |*buffer_pool_read| buf: { + const current_frame: FrameIndex = @intCast(read_offset / FRAME_SIZE); + + const frame_start: FrameOffset = @intCast(read_offset % FRAME_SIZE); + const frame_end: FrameOffset = if (current_frame == + buffer_pool_read.frame_refs.len - 1) + buffer_pool_read.last_frame_end_offset + else + FRAME_SIZE; + + const end_idx = @min(frame_end, frame_start + self.bytesRemaining()); + + const buf = buffer_pool_read.buffer_pool.frames[ + buffer_pool_read.frame_refs[current_frame].index + ][frame_start..end_idx]; + + break :buf buf; + }, + .owned_allocation, .unowned_allocation => |external| buf: { + const end_idx = @min( + read_offset + FRAME_SIZE, + read_offset + self.bytesRemaining(), + ); + break :buf external[read_offset..end_idx]; + }, + .empty => unreachable, + .sub_read => @panic("unimplemented"), + }; + + if (frame_buf.len == 0) unreachable; // guarded against by the bytes_read check + if (self.bytes_read > self.len()) unreachable; // we've gone too far + + self.bytes_read += @intCast(frame_buf.len); + return frame_buf; } }; -} -test AtomicStack { - const allocator = std.testing.allocator; - var stack = try AtomicStack(usize).init(allocator, 100); - defer stack.deinit(allocator); + fn bincodeSerialize( + writer: anytype, + read_handle: anytype, + params: bincode.Params, + ) anyerror!void { + // we want to serialise it as if it's a slice + try bincode.write(writer, @as(u64, read_handle.len()), params); + + var iter = read_handle.iterator(); + while (iter.nextFrame()) |frame| { + try writer.writeAll(frame); + } + } - for (0..100) |i| stack.appendAssumeCapacity(i); + fn bincodeDeserialize( + alloc: std.mem.Allocator, + reader: anytype, + params: bincode.Params, + ) anyerror!AccountDataHandle { + const data = try bincode.read(alloc, []u8, reader, params); + return AccountDataHandle.initAllocatedOwned(data); + } - var i: usize = 100; - while (i > 0) { - i -= 1; - try std.testing.expectEqual(i, stack.popOrNull()); + fn bincodeFree(allocator: std.mem.Allocator, read_handle: anytype) void { + read_handle.deinit(allocator); } -} +}; test "BufferPool indicesRequired" { const TestCase = struct { @@ -936,11 +1144,11 @@ test "BufferPool indicesRequired" { .{ .start = F_SIZE, .end = F_SIZE * 2, .expected = 1 }, }; - for (0.., cases) |i, case| { + for (cases, 0..) |case, i| { errdefer std.debug.print("failed on case(i={}): {}", .{ i, case }); try std.testing.expectEqual( case.expected, - BufferPool.computeNumberofFrameIndices(case.start, case.end), + BufferPool.computeNumberOfFrameIndices(case.start, case.end), ); } } @@ -948,15 +1156,31 @@ test "BufferPool indicesRequired" { test "BufferPool init deinit" { const allocator = std.testing.allocator; - for (0.., &[_]u32{ - 2, 3, 4, 8, - 16, 32, 256, 4096, - 16384, 16385, 24576, 32767, - 32768, 49152, 65535, 65536, - }) |i, frame_count| { + const initDeinit = struct { + fn f(alloc: std.mem.Allocator, n_frames: u32) !void { + var bp = try BufferPool.init(alloc, n_frames); + defer bp.deinit(alloc); + } + }.f; + + for ( + &[_]u32{ + 2, 3, 4, 8, + 16, 32, 256, 4096, + 16384, 16385, 24576, 32767, + 32768, 49152, 65535, 65536, + }, + 0.., + ) |frame_count, i| { errdefer std.debug.print("failed on case(i={}): {}", .{ i, frame_count }); var bp = try BufferPool.init(allocator, frame_count); bp.deinit(allocator); + + try std.testing.checkAllAllocationFailures( + std.testing.allocator, + initDeinit, + .{frame_count}, + ); } } @@ -975,7 +1199,7 @@ test "BufferPool readBlocking" { } test "BufferPool readIoUringSubmitAndWait" { - if (!use_io_uring) return error.SkipZigTest; + if (!USE_IO_URING) return error.SkipZigTest; const allocator = std.testing.allocator; @@ -986,7 +1210,7 @@ test "BufferPool readIoUringSubmitAndWait" { var bp = try BufferPool.init(allocator, 2048); // 2048 frames = 1MiB defer bp.deinit(allocator); - var read = try bp.readIoUringSubmitAndWait(allocator, file, file_id, 0, 1000); + var read = try bp.readIoUring(allocator, file, file_id, 0, 1000); defer read.deinit(allocator); } @@ -997,7 +1221,7 @@ test "BufferPool basic usage" { defer file.close(); const file_id = FileId.fromInt(1); - var bp = try BufferPool.init(allocator, 2048); // 2048 frames = 1MiB + var bp = try BufferPool.init(allocator, 2048); // 2048 frames = 1MiB @ FRAME_SIZE=512 defer bp.deinit(allocator); var fba_buf: [4096]u8 = undefined; @@ -1006,8 +1230,12 @@ test "BufferPool basic usage" { const read = try bp.read(fba.allocator(), file, file_id, 0, 1000); defer read.deinit(fba.allocator()); - try std.testing.expectEqual(2, read.frame_indices.len); - for (read.frame_indices) |f_idx| try std.testing.expect(f_idx != INVALID_FRAME); + try std.testing.expectEqual( + try std.math.divCeil(usize, 1000, FRAME_SIZE), + read.buffer_pool_read.frame_refs.len, + ); + for (read.buffer_pool_read.frame_refs) |frame_ref| + try std.testing.expect(frame_ref.index != INVALID_FRAME); { var iter1 = read.iterator(); @@ -1021,7 +1249,7 @@ test "BufferPool basic usage" { defer fba.allocator().free(iter_data); var bytes_read: usize = 0; - while (iter2.next()) |byte| : (bytes_read += 1) { + while (iter2.nextByte()) |byte| : (bytes_read += 1) { iter_data[bytes_read] = byte; } try std.testing.expectEqual(1000, bytes_read); @@ -1044,9 +1272,12 @@ test "BufferPool allocation sizes" { // except for the s3_fifo queues, which are split to be ~90% and ~10% of that // length. var total_requested_bytes = gpa.total_requested_bytes; - total_requested_bytes -= bp.eviction_lfu.ghost.buf.len * @sizeOf(FrameIndex); - total_requested_bytes -= bp.eviction_lfu.main.buf.len * @sizeOf(FrameIndex); - total_requested_bytes -= bp.eviction_lfu.small.buf.len * @sizeOf(FrameIndex); + total_requested_bytes -= bp.frame_manager.eviction_lfu.readField("ghost").buf.len * + @sizeOf(FrameIndex); + total_requested_bytes -= bp.frame_manager.eviction_lfu.readField("main").buf.len * + @sizeOf(FrameIndex); + total_requested_bytes -= bp.frame_manager.eviction_lfu.readField("small").buf.len * + @sizeOf(FrameIndex); total_requested_bytes -= @sizeOf(usize) * 3; // hashmap header try std.testing.expect(total_requested_bytes % frame_count == 0); @@ -1055,7 +1286,7 @@ test "BufferPool allocation sizes" { // As of writing, all metadata (excluding eviction_lfu, including frame_map) // is 50 bytes or ~9% of memory usage at a frame size of 512, or 50MB for a // million frames. - try std.testing.expect((total_requested_bytes / frame_count) - 512 <= 64); + try std.testing.expect((total_requested_bytes / frame_count) - FRAME_SIZE <= 80); } test "BufferPool filesize > frame_size * num_frames" { @@ -1080,14 +1311,6 @@ test "BufferPool filesize > frame_size * num_frames" { // file_size > total buffers size => we evict as we go var offset: u32 = 0; while (offset < file_size) : (offset += FRAME_SIZE) { - // when we've already filled every frame, we evict as we go - // => free list should be empty - if (offset >= FRAME_SIZE * num_frames) { - try std.testing.expectEqual(0, bp.free_list.len.raw); - } else { - try std.testing.expect(bp.free_list.len.raw > 0); - } - const read_frame = try bp.read( allocator, file, @@ -1096,17 +1319,15 @@ test "BufferPool filesize > frame_size * num_frames" { offset + FRAME_SIZE, ); - try std.testing.expectEqual(1, read_frame.frame_indices.len); + try std.testing.expectEqual(1, read_frame.buffer_pool_read.frame_refs.len); - const frame: []const u8 = bp.frames[ - read_frame.frame_indices[0] - ][0..bp.frames_metadata.size[ - read_frame.frame_indices[0] - ]]; + const frame: []const u8 = &bp.frames[ + read_frame.buffer_pool_read.frame_refs[0].index + ]; var frame2: [FRAME_SIZE]u8 = undefined; const bytes_read = try file.preadAll(&frame2, offset); - try std.testing.expectEqualSlices(u8, frame2[0..bytes_read], frame); + try std.testing.expectEqualSlices(u8, frame2[0..bytes_read], frame[0..bytes_read]); read_frame.deinit(allocator); } } @@ -1141,7 +1362,12 @@ test "BufferPool random read" { @min(file_size, range_start + num_frames * FRAME_SIZE), ); - if (try BufferPool.computeNumberofFrameIndices(range_start, range_end) > num_frames) { + errdefer std.debug.print( + "failed on case(reads={}): file[{}..{}]\n", + .{ reads, range_start, range_end }, + ); + + if (try BufferPool.computeNumberOfFrameIndices(range_start, range_end) > num_frames) { continue; } @@ -1149,7 +1375,7 @@ test "BufferPool random read" { defer read.deinit(gpa.allocator()); // check for equality with other impl - if (use_io_uring) { + if (USE_IO_URING) { var read2 = try bp.readBlocking( gpa.allocator(), file, @@ -1159,22 +1385,44 @@ test "BufferPool random read" { ); defer read2.deinit(gpa.allocator()); - try std.testing.expect( - read.first_frame_start_offset == read2.first_frame_start_offset, + try std.testing.expectEqual( + read.buffer_pool_read.first_frame_start_offset, + read2.buffer_pool_read.first_frame_start_offset, + ); + try std.testing.expectEqual( + read.buffer_pool_read.last_frame_end_offset, + read2.buffer_pool_read.last_frame_end_offset, ); - try std.testing.expect(read.last_frame_end_offset == read2.last_frame_end_offset); - try std.testing.expectEqualSlices(u32, read.frame_indices, read2.frame_indices); + + try std.testing.expectEqual( + read.buffer_pool_read.frame_refs.len, + read2.buffer_pool_read.frame_refs.len, + ); + + for ( + read.buffer_pool_read.frame_refs, + read2.buffer_pool_read.frame_refs, + ) |read_frameref, read2_frameref| { + try std.testing.expectEqual(read_frameref.index, read2_frameref.index); + } } - var total_bytes_read: u32 = 0; - for (read.frame_indices) |f_idx| total_bytes_read += bp.frames_metadata.size[f_idx]; + errdefer std.debug.print("failed with read: {}\n", .{read}); + const read_data_bp_iter = try allocator.alloc(u8, read.len()); defer allocator.free(read_data_bp_iter); { - var i: u32 = 0; var iter = read.iterator(); - while (iter.next()) |b| : (i += 1) read_data_bp_iter[i] = b; - if (i != read.len()) unreachable; + var start_offset: u32 = 0; + while (iter.nextFrame()) |frame_slice| { + @memcpy( + read_data_bp_iter[start_offset..][0..frame_slice.len], + frame_slice, + ); + start_offset += @intCast(frame_slice.len); + } + + try std.testing.expectEqual(read.len(), iter.bytes_read); } var iter = read.iterator(); @@ -1186,7 +1434,7 @@ test "BufferPool random read" { const read_data_bp_readall = try allocator.alloc(u8, read.len()); defer allocator.free(read_data_bp_readall); - try read.readAll(read_data_bp_readall); + read.readAll(read_data_bp_readall); const read_data_expected = try allocator.alloc(u8, range_end - range_start); defer allocator.free(read_data_expected); @@ -1205,3 +1453,83 @@ test "BufferPool random read" { try std.testing.expectEqual(preaded_bytes, read_data_bp_readall.len); } } + +test "AccountDataHandle bincode" { + const allocator = std.testing.allocator; + + const file = try std.fs.cwd().openFile("data/test-data/test_account_file", .{}); + defer file.close(); + const file_id = FileId.fromInt(1); + + const num_frames = 400; + + var bp = try BufferPool.init(allocator, num_frames); + defer bp.deinit(allocator); + + const file_size: u32 = @intCast((try file.stat()).size); + + const read = try bp.read(allocator, file, file_id, 0, file_size); + defer read.deinit(allocator); + + const read_data = try read.readAllAllocate(allocator); + defer allocator.free(read_data); + + { + var serialised_from_slice = std.ArrayList(u8).init(allocator); + defer serialised_from_slice.deinit(); + + try bincode.write(serialised_from_slice.writer(), read_data, .{}); + + const deserialised_from_slice = try bincode.readFromSlice( + allocator, + AccountDataHandle, + serialised_from_slice.items, + .{}, + ); + defer deserialised_from_slice.deinit(allocator); + + try std.testing.expectEqualSlices( + u8, + read_data, + deserialised_from_slice.owned_allocation, + ); + } + + { + var serialised_from_handle = std.ArrayList(u8).init(allocator); + defer serialised_from_handle.deinit(); + + try bincode.write(serialised_from_handle.writer(), read, .{}); + + const deserialised_from_handle = try bincode.readFromSlice( + allocator, + AccountDataHandle, + serialised_from_handle.items, + .{}, + ); + defer deserialised_from_handle.deinit(allocator); + + try std.testing.expectEqualSlices( + u8, + read_data, + deserialised_from_handle.owned_allocation, + ); + } +} + +test "BufferPool failed read" { + var bp = try BufferPool.init(std.testing.allocator, 1024); + defer bp.deinit(std.testing.allocator); + + const file = try std.fs.cwd().openFile("data/test-data/test_account_file", .{}); + + const read = try bp.read(std.testing.allocator, file, FileId.fromInt(0), 0, 1000); + read.deinit(std.testing.allocator); + + const read2 = bp.read(std.testing.failing_allocator, file, FileId.fromInt(0), 10_000, 11_000); + try std.testing.expectError(error.OutOfMemory, read2); + + file.close(); // close early => fail the read + const read3 = bp.read(std.testing.allocator, file, FileId.fromInt(0), 20_000, 31_000); + try std.testing.expectError(error.NotOpenForReading, read3); +} diff --git a/src/accountsdb/cache.zig b/src/accountsdb/cache.zig index 13ff539e0..d964f8868 100644 --- a/src/accountsdb/cache.zig +++ b/src/accountsdb/cache.zig @@ -9,6 +9,7 @@ const LruCacheCustom = sig.utils.lru.LruCacheCustom; const ReferenceCounter = sig.sync.reference_counter.ReferenceCounter; const Slot = sig.core.Slot; const Counter = sig.prometheus.counter.Counter; +const AccountDataHandle = sig.accounts_db.buffer_pool.AccountDataHandle; /// Stores read-only in-memory copies of commonly used *rooted* accounts pub const AccountsCache = struct { @@ -23,6 +24,8 @@ pub const AccountsCache = struct { /// the slot that this version of the account originates from slot: Slot, + const Buf = []align(@alignOf(CachedAccount)) u8; + /// Makes use of the fact that CachedAccount needs to live on the heap & shares its lifetime /// with .ref_count in order to allocate once instead of twice. pub fn initCreate( @@ -30,17 +33,18 @@ pub const AccountsCache = struct { account: Account, slot: Slot, ) error{OutOfMemory}!*CachedAccount { - const buf = try allocator.alignedAlloc( + const buf: Buf = try allocator.alignedAlloc( u8, @alignOf(CachedAccount), - @sizeOf(CachedAccount) + account.data.len, + @sizeOf(CachedAccount) + account.data.len(), ); const new_entry = @as(*CachedAccount, @ptrCast(buf.ptr)); const account_data = buf[@sizeOf(CachedAccount)..]; + account.data.readAll(account_data); + var new_account = account; - new_account.data = account_data; - @memcpy(new_account.data, account.data); + new_account.data = AccountDataHandle.initAllocated(account_data); new_entry.* = .{ .account = new_account, @@ -52,10 +56,10 @@ pub const AccountsCache = struct { } pub fn deinitDestroy(self: *CachedAccount, allocator: std.mem.Allocator) void { - const buf: []align(8) u8 = @as( - [*]align(8) u8, + const buf: Buf = @as( + [*]align(@alignOf(CachedAccount)) u8, @ptrCast(self), - )[0 .. @sizeOf(CachedAccount) + self.account.data.len]; + )[0 .. @sizeOf(CachedAccount) + self.account.data.len()]; @memset(buf, undefined); allocator.free(buf); } @@ -206,7 +210,7 @@ test "AccountsCache put and get account" { defer if (cached_account) |cached| cached.releaseOrDestroy(allocator); try std.testing.expect(cached_account != null); - try std.testing.expectEqualSlices(u8, account.data, cached_account.?.account.data); + try AccountDataHandle.expectEqual(account.data, cached_account.?.account.data); } test "AccountsCache returns null when account is missing" { diff --git a/src/accountsdb/db.zig b/src/accountsdb/db.zig index 9f27d0d91..70b4d8b2f 100644 --- a/src/accountsdb/db.zig +++ b/src/accountsdb/db.zig @@ -33,6 +33,8 @@ const SnapshotFiles = sig.accounts_db.snapshots.SnapshotFiles; const AccountIndex = sig.accounts_db.index.AccountIndex; const AccountRef = sig.accounts_db.index.AccountRef; +const BufferPool = sig.accounts_db.buffer_pool.BufferPool; +const AccountDataHandle = sig.accounts_db.buffer_pool.AccountDataHandle; const PubkeyShardCalculator = sig.accounts_db.index.PubkeyShardCalculator; const ShardedPubkeyRefMap = sig.accounts_db.index.ShardedPubkeyRefMap; @@ -108,6 +110,8 @@ pub const AccountsDB = struct { /// NOTE: see accountsdb/readme.md for more details on how these are used file_map_fd_rw: std.Thread.RwLock, + buffer_pool: BufferPool, + /// Tracks how many accounts (which we have stored) are dead for a specific slot. /// Used during clean to queue an AccountFile for shrink if it contains /// a large percentage of dead accounts, or deletion if the file contains only @@ -170,6 +174,8 @@ pub const AccountsDB = struct { gossip_view: ?GossipView, index_allocation: AccountIndex.AllocatorConfig.Tag, number_of_index_shards: usize, + /// Amount of BufferPool frames, used for cached reads. Default = 1GiB. + buffer_pool_frames: u32 = 2 * 1024 * 1024, /// Limit of cached accounts. Use null to disable accounts caching. lru_size: ?usize, }; @@ -203,6 +209,9 @@ pub const AccountsDB = struct { else => |e| return e, }; + const buffer_pool = try BufferPool.init(params.allocator, params.buffer_pool_frames); + errdefer buffer_pool.deinit(params.allocator); + return .{ .allocator = params.allocator, .metrics = metrics, @@ -219,6 +228,7 @@ pub const AccountsDB = struct { .maybe_accounts_cache_rw = if (maybe_accounts_cache) |cache| RwMux(AccountsCache).init(cache) else null, .file_map = RwMux(FileMap).init(.{}), .file_map_fd_rw = .{}, + .buffer_pool = buffer_pool, .dead_accounts_counter = RwMux(DeadAccountsCounter).init(DeadAccountsCounter.init(params.allocator)), .largest_file_id = FileId.fromInt(0), @@ -234,6 +244,7 @@ pub const AccountsDB = struct { pub fn deinit(self: *Self) void { self.account_index.deinit(); + self.buffer_pool.deinit(self.allocator); if (self.maybe_accounts_cache_rw) |*accounts_cache_rw| { const accounts_cache, var accounts_cache_lg = accounts_cache_rw.writeWithLock(); @@ -256,6 +267,7 @@ pub const AccountsDB = struct { { const file_map, var file_map_lg = self.file_map.writeWithLock(); defer file_map_lg.unlock(); + for (file_map.values()) |v| v.deinit(); file_map.deinit(self.allocator); } { @@ -373,7 +385,7 @@ pub const AccountsDB = struct { self.logger.err().logf("Failed to open accounts/{s}: {s}", .{ file_name_bounded.constSlice(), @errorName(err) }); return err; }; - defer accounts_file.close(); + errdefer accounts_file.close(); break :blk AccountFile.init(accounts_file, file_info, slot) catch |err| { self.logger.err().logf("failed to *open* AccountsFile {s}: {s}\n", .{ file_name_bounded.constSlice(), @errorName(err) }); @@ -525,6 +537,7 @@ pub const AccountsDB = struct { defer accounts_cache_lg.unlock(); accounts_cache.deinit(); } + loading_thread.buffer_pool.deinit(per_thread_allocator); } } @@ -601,7 +614,7 @@ pub const AccountsDB = struct { } defer { if (geyser_slot_storage) |storage| { - storage.deinit(); + storage.deinit(self.allocator); self.allocator.destroy(storage); } } @@ -623,7 +636,7 @@ pub const AccountsDB = struct { }); return err; }; - defer accounts_file.close(); + errdefer accounts_file.close(); break :blk AccountFile.init(accounts_file, file_info, slot) catch |err| { self.logger.err().logf("failed to *open* AccountsFile {s}: {s}\n", .{ @@ -633,13 +646,16 @@ pub const AccountsDB = struct { return err; }; }; - errdefer accounts_file.deinit(); + var accounts_file_moved_to_filemap = false; + defer if (!accounts_file_moved_to_filemap) accounts_file.deinit(); // index the account file var slot_references = std.ArrayListUnmanaged(AccountRef).initBuffer( references_buf[n_accounts_total..], ); indexAndValidateAccountFile( + self.allocator, + &self.buffer_pool, &accounts_file, self.account_index.pubkey_ref_map.shard_calculator, shard_counts, @@ -661,6 +677,10 @@ pub const AccountsDB = struct { if (n_accounts_this_slot == 0) { continue; } + const file_id = file_info.id; + file_map.putAssumeCapacityNoClobber(file_id, accounts_file); + accounts_file_moved_to_filemap = true; + // track slice of references per slot n_accounts_total += n_accounts_this_slot; slot_reference_map.putAssumeCapacityNoClobber( @@ -674,7 +694,7 @@ pub const AccountsDB = struct { const geyser_writer = self.geyser_writer.?; // SAFE: will always be set if geyser_is_enabled // ! reset memory for the next slot - defer geyser_storage.reset(); + defer geyser_storage.reset(self.allocator); const data_versioned: sig.geyser.core.VersionedAccountPayload = .{ .AccountPayloadV1 = .{ @@ -686,8 +706,6 @@ pub const AccountsDB = struct { try geyser_writer.writePayloadToPipe(data_versioned); } - const file_id = file_info.id; - file_map.putAssumeCapacityNoClobber(file_id, accounts_file); self.largest_file_id = FileId.max(self.largest_file_id, file_id); _ = self.largest_rooted_slot.fetchMax(slot, .release); self.largest_flushed_slot.store(self.largest_rooted_slot.load(.acquire), .release); @@ -1193,17 +1211,26 @@ pub const AccountsDB = struct { // hashes aren't always stored correctly in snapshots if (account_hash.eql(Hash.ZEROES)) { const account, var lock_guard = try self.getAccountFromRefWithReadLock(max_slot_ref); - defer lock_guard.unlock(); + defer { + lock_guard.unlock(); + switch (account) { + .file => |in_file_account| in_file_account.deinit(self.allocator), + .unrooted_map => {}, + } + } account_hash = switch (account) { - .file => |in_file_account| sig.core.account.hashAccount( - in_file_account.lamports().*, - in_file_account.data, - &in_file_account.owner().data, - in_file_account.executable().*, - in_file_account.rent_epoch().*, - &in_file_account.pubkey().data, - ), + .file => |in_file_account| blk: { + var iter = in_file_account.data.iterator(); + break :blk sig.core.account.hashAccount( + in_file_account.lamports().*, + &iter, + &in_file_account.owner().data, + in_file_account.executable().*, + in_file_account.rent_epoch().*, + &in_file_account.pubkey().data, + ); + }, .unrooted_map => |unrooted_account| unrooted_account.hash(&key), }; } @@ -1467,7 +1494,7 @@ pub const AccountsDB = struct { } const file, const file_id = try self.createAccountFile(size, slot); - defer file.close(); + errdefer file.close(); const offsets = try self.allocator.alloc(u64, accounts.len); defer self.allocator.free(offsets); @@ -1542,7 +1569,7 @@ pub const AccountsDB = struct { // free slices for (accounts) |account| { - self.allocator.free(account.data); + account.data.deinit(self.allocator); } self.allocator.free(accounts); self.allocator.free(pubkeys); @@ -1601,8 +1628,9 @@ pub const AccountsDB = struct { break :blk file_map.get(file_id).?; }; - var account_iter = account_file.iterator(); - while (account_iter.next()) |account| { + var account_iter = account_file.iterator(self.allocator, &self.buffer_pool); + while (try account_iter.nextNoData()) |account| { + defer account.deinit(self.allocator); const pubkey = account.pubkey().*; // check if already cleaned @@ -1846,8 +1874,10 @@ pub const AccountsDB = struct { var accounts_alive_size: u64 = 0; var accounts_dead_size: u64 = 0; - var account_iter = shrink_account_file.iterator(); - while (account_iter.next()) |*account_in_file| { + var account_iter = shrink_account_file.iterator(self.allocator, &self.buffer_pool); + while (try account_iter.nextNoData()) |*account_in_file| { + defer account_in_file.deinit(self.allocator); + const pubkey = account_in_file.pubkey(); // account is dead if it is not in the index; dead accounts // are removed from the index during cleaning @@ -1881,16 +1911,17 @@ pub const AccountsDB = struct { accounts_alive_size, slot, ); - defer new_file.close(); + // don't close file if it ends up in file_map + var new_file_in_map = false; + defer if (!new_file_in_map) new_file.close(); var file_size: usize = 0; account_iter.reset(); for (is_alive_flags.items) |is_alive| { // SAFE: we know is_alive_flags is the same length as the account_iter - const account = account_iter.next().?; - if (is_alive) { - file_size += account.getSizeInFile(); - } + const account = (try account_iter.nextNoData()).?; + defer account.deinit(self.allocator); + if (is_alive) file_size += account.getSizeInFile(); } var account_file_buf = std.ArrayList(u8).init(self.allocator); @@ -1904,7 +1935,8 @@ pub const AccountsDB = struct { var offset: usize = 0; for (is_alive_flags.items) |is_alive| { // SAFE: we know is_alive_flags is the same length as the account_iter - const account = account_iter.next().?; + const account = (try account_iter.next()).?; + defer account.deinit(self.allocator); if (is_alive) { try account_file_buf.resize(account.getSizeInFile()); offsets.appendAssumeCapacity(offset); @@ -1927,6 +1959,7 @@ pub const AccountsDB = struct { new_account_file.number_of_accounts = accounts_alive_count; file_map.putAssumeCapacityNoClobber(new_file_id, new_account_file); + new_file_in_map = true; } // update the references @@ -1937,7 +1970,8 @@ pub const AccountsDB = struct { var offset_index: u64 = 0; for (is_alive_flags.items) |is_alive| { // SAFE: we know is_alive_flags is the same length as the account_iter - const account = account_iter.next().?; + const account = (try account_iter.nextNoData()).?; + defer account.deinit(self.allocator); if (is_alive) { // find the slot in the reference list const pubkey = account.pubkey(); @@ -2071,7 +2105,7 @@ pub const AccountsDB = struct { }; if (maybe_cached_account) |cached_account| { - const account = try cached_account.account.clone(self.allocator); + const account = try cached_account.account.cloneOwned(self.allocator); cached_account.releaseOrDestroy(self.allocator); return account; } else { @@ -2080,6 +2114,7 @@ pub const AccountsDB = struct { ref_info.file_id, ref_info.offset, ); + errdefer account.deinit(self.allocator); if (self.maybe_accounts_cache_rw) |*accounts_cache_rw| { const accounts_cache, var accounts_cache_lg = accounts_cache_rw.writeWithLock(); @@ -2097,7 +2132,7 @@ pub const AccountsDB = struct { _, const accounts = unrooted_accounts.get(account_ref.slot) orelse return error.SlotNotFound; const account = accounts[ref_info.index]; - return account.clone(self.allocator); + return try account.cloneOwned(self.allocator); }, } } @@ -2124,6 +2159,8 @@ pub const AccountsDB = struct { switch (account_ref.location) { .File => |ref_info| { const account = try self.getAccountInFileAndLock( + self.allocator, + &self.buffer_pool, ref_info.file_id, ref_info.offset, ); @@ -2156,9 +2193,15 @@ pub const AccountsDB = struct { file_id: FileId, offset: usize, ) (GetAccountInFileError || std.mem.Allocator.Error)!Account { - const account_in_file = try self.getAccountInFileAndLock(file_id, offset); + const account_in_file = try self.getAccountInFileAndLock( + self.allocator, + &self.buffer_pool, + file_id, + offset, + ); + defer account_in_file.deinit(account_allocator); defer self.file_map_fd_rw.unlockShared(); - return try account_in_file.toOwnedAccount(account_allocator); + return try account_in_file.dupeCachedAccount(account_allocator); } /// Gets an account given an file_id and offset value. @@ -2167,12 +2210,19 @@ pub const AccountsDB = struct { /// when done with the account. pub fn getAccountInFileAndLock( self: *Self, + metadata_allocator: std.mem.Allocator, + buffer_pool: *BufferPool, file_id: FileId, offset: usize, ) GetAccountInFileError!AccountInFile { self.file_map_fd_rw.lockShared(); errdefer self.file_map_fd_rw.unlockShared(); - return try self.getAccountInFileAssumeLock(file_id, offset); + return try self.getAccountInFileAssumeLock( + metadata_allocator, + buffer_pool, + file_id, + offset, + ); } /// Gets an account given a file_id and an offset value. @@ -2180,6 +2230,8 @@ pub const AccountsDB = struct { /// locked for reading (shared). pub fn getAccountInFileAssumeLock( self: *Self, + metadata_allocator: std.mem.Allocator, + buffer_pool: *BufferPool, file_id: FileId, offset: usize, ) GetAccountInFileError!AccountInFile { @@ -2188,7 +2240,11 @@ pub const AccountsDB = struct { defer file_map_lg.unlock(); break :blk file_map.get(file_id) orelse return error.FileIdNotFound; }; - return account_file.readAccount(offset) catch error.InvalidOffset; + return account_file.readAccount( + metadata_allocator, + buffer_pool, + offset, + ) catch error.InvalidOffset; } pub fn getAccountHashAndLamportsFromRef( @@ -2207,12 +2263,14 @@ pub const AccountsDB = struct { }; const result = account_file.getAccountHashAndLamports( + self.allocator, + &self.buffer_pool, ref_info.offset, ) catch return error.InvalidOffset; return .{ - result.hash.*, - result.lamports.*, + result.hash, + result.lamports, }; }, // we dont use this method for cache @@ -2265,13 +2323,21 @@ pub const AccountsDB = struct { ) GetTypeFromAccountError!T { const account, var lock_guard = try self.getAccountWithReadLock(pubkey); // NOTE: bincode will copy heap memory so its safe to unlock at the end of the function - defer lock_guard.unlock(); + defer { + switch (account) { + .file => |file| file.deinit(allocator), + .unrooted_map => {}, + } + lock_guard.unlock(); + } - const file_data: []const u8 = switch (account) { + const file_data: AccountDataHandle = switch (account) { .file => |in_file_account| in_file_account.data, .unrooted_map => |unrooted_map_account| unrooted_map_account.data, }; - const t = sig.bincode.readFromSlice(allocator, T, file_data, .{}) catch { + + var iter = file_data.iterator(); + const t = sig.bincode.read(allocator, T, iter.reader(), .{}) catch { return error.DeserializationError; }; return t; @@ -2301,6 +2367,8 @@ pub const AccountsDB = struct { var references = std.ArrayListUnmanaged(AccountRef).initBuffer(reference_buf); try indexAndValidateAccountFile( + self.allocator, + &self.buffer_pool, account_file, self.account_index.pubkey_ref_map.shard_calculator, shard_counts, @@ -2326,13 +2394,18 @@ pub const AccountsDB = struct { // we update the bank hash stats while locking the file map to avoid // reading accounts from the file map and getting inaccurate/stale // bank hash stats. - var account_iter = account_file.iterator(); - while (account_iter.next()) |account_in_file| { + var account_iter = account_file.iterator( + self.allocator, + &self.buffer_pool, + ); + while (try account_iter.next()) |account_in_file| { + defer account_in_file.deinit(self.allocator); + const bhs, var bhs_lg = try self.getOrInitBankHashStats(account_file.slot); defer bhs_lg.unlock(); bhs.update(.{ .lamports = account_in_file.lamports().*, - .data_len = account_in_file.data.len, + .data_len = account_in_file.data.len(), .executable = account_in_file.executable().*, }); } @@ -2395,13 +2468,13 @@ pub const AccountsDB = struct { for (accounts_duped, accounts, 0..) |*account, original, i| { errdefer for (accounts_duped[0..i]) |prev| prev.deinit(self.allocator); - account.* = try original.clone(self.allocator); + account.* = try original.cloneOwned(self.allocator); const bhs, var bhs_lg = try self.getOrInitBankHashStats(slot); defer bhs_lg.unlock(); bhs.update(.{ .lamports = account.lamports, - .data_len = account.data.len, + .data_len = account.data.len(), .executable = account.executable, }); } @@ -3086,6 +3159,7 @@ pub const GeyserTmpStorage = struct { pub const Error = error{ OutOfGeyserFBAMemory, OutOfGeyserArrayMemory, + OutOfMemory, // feels odd adding this, but unfamiliar with geyser - sebastian }; pub fn init(allocator: std.mem.Allocator, n_accounts_estimate: usize) !Self { @@ -3095,21 +3169,29 @@ pub const GeyserTmpStorage = struct { }; } - pub fn deinit(self: *Self) void { + pub fn deinit( + self: *Self, + allocator: std.mem.Allocator, + ) void { + for (self.accounts.items) |account| account.deinit(allocator); self.accounts.deinit(); self.pubkeys.deinit(); } - pub fn reset(self: *Self) void { + pub fn reset( + self: *Self, + allocator: std.mem.Allocator, + ) void { + for (self.accounts.items) |account| account.deinit(allocator); self.accounts.clearRetainingCapacity(); self.pubkeys.clearRetainingCapacity(); } - pub fn cloneAndTrack(self: *Self, account_in_file: AccountInFile) Error!void { - // NOTE: this works because we mmap the account files - this will not work once we remove mmaps - const account = account_in_file.toAccount(); - + pub fn cloneAndTrack(self: *Self, allocator: std.mem.Allocator, account_in_file: AccountInFile) Error!void { + const account = try account_in_file.dupeCachedAccount(allocator); + errdefer account.deinit(allocator); self.accounts.append(account) catch return Error.OutOfGeyserArrayMemory; + errdefer _ = self.accounts.pop(); self.pubkeys.append(account_in_file.pubkey().*) catch return Error.OutOfGeyserArrayMemory; } }; @@ -3119,9 +3201,11 @@ pub const ValidateAccountFileError = error{ InvalidAccountFileLength, OutOfMemory, OutOfReferenceMemory, -} || AccountInFile.ValidateError || GeyserTmpStorage.Error; +} || AccountInFile.ValidateError || GeyserTmpStorage.Error || BufferPool.ReadError; pub fn indexAndValidateAccountFile( + allocator: std.mem.Allocator, + buffer_pool: *BufferPool, accounts_file: *AccountFile, shard_calculator: PubkeyShardCalculator, shard_counts: []usize, @@ -3136,11 +3220,20 @@ pub fn indexAndValidateAccountFile( } while (true) { - const account = accounts_file.readAccount(offset) catch break; + const account = accounts_file.readAccount( + allocator, + buffer_pool, + offset, + ) catch |err| switch (err) { + error.EOF => break, + else => |e| return e, + }; + defer account.deinit(allocator); + try account.validate(); if (geyser_storage) |storage| { - try storage.cloneAndTrack(account); + try storage.cloneAndTrack(allocator, account); } if (account_refs.capacity == account_refs.items.len) { @@ -3202,8 +3295,12 @@ pub fn writeSnapshotTarWithFields( std.debug.assert(account_file.length == file_info.length); try snapgen.writeAccountFileHeader(archive_writer_counted, file_slot, file_info); - try archive_writer_counted.writeAll(account_file.memory); - try snapgen.writeAccountFilePadding(archive_writer_counted, account_file.memory.len); + + try account_file.file.seekTo(0); + var fifo = std.fifo.LinearFifo(u8, .{ .Static = std.mem.page_size }).init(); + try fifo.pump(account_file.file.reader(), archive_writer_counted); + + try snapgen.writeAccountFilePadding(archive_writer_counted, account_file.file_size); } try archive_writer_counted.writeAll(&sig.utils.tar.sentinel_blocks); @@ -3534,7 +3631,7 @@ test "write and read an account" { const pubkey = Pubkey.initRandom(prng.random()); var data = [_]u8{ 1, 2, 3 }; const test_account = Account{ - .data = &data, + .data = AccountDataHandle.initAllocated(&data), .executable = false, .lamports = 100, .owner = Pubkey.ZEROES, @@ -3662,19 +3759,16 @@ test "load clock sysvar" { .leader_schedule_epoch = 1, .unix_timestamp = 1733350255, }; - try std.testing.expectEqual(expected_clock, try accounts_db.getTypeFromAccount(allocator, sysvars.Clock, &sysvars.IDS.clock)); + + const found_clock = try accounts_db.getTypeFromAccount(allocator, sysvars.Clock, &sysvars.IDS.clock); + + try std.testing.expectEqual(expected_clock, found_clock); } test "load other sysvars" { var gpa_state: std.heap.GeneralPurposeAllocator(.{ .stack_trace_frames = 64 }) = .{}; defer _ = gpa_state.deinit(); const allocator = gpa_state.allocator(); - // const allocator = std.testing.allocator; - const panic_allocator = sig.utils.allocators.failing.allocator(.{ - .alloc = .panics, - .resize = .panics, - .free = .panics, - }); var tmp_dir_root = std.testing.tmpDir(.{}); defer tmp_dir_root.cleanup(); @@ -3688,9 +3782,9 @@ test "load other sysvars" { } const SlotAndHash = sig.core.hash.SlotAndHash; - _ = try accounts_db.getTypeFromAccount(panic_allocator, sysvars.EpochSchedule, &sysvars.IDS.epoch_schedule); - _ = try accounts_db.getTypeFromAccount(panic_allocator, sysvars.Rent, &sysvars.IDS.rent); - _ = try accounts_db.getTypeFromAccount(panic_allocator, SlotAndHash, &sysvars.IDS.slot_hashes); + _ = try accounts_db.getTypeFromAccount(allocator, sysvars.EpochSchedule, &sysvars.IDS.epoch_schedule); + _ = try accounts_db.getTypeFromAccount(allocator, sysvars.Rent, &sysvars.IDS.rent); + _ = try accounts_db.getTypeFromAccount(allocator, SlotAndHash, &sysvars.IDS.slot_hashes); const stake_history = try accounts_db.getTypeFromAccount(allocator, sysvars.StakeHistory, &sysvars.IDS.stake_history); defer sig.bincode.free(allocator, stake_history); @@ -3699,14 +3793,17 @@ test "load other sysvars" { defer sig.bincode.free(allocator, slot_history); // // not always included in local snapshot - // _ = try accounts_db.getTypeFromAccount(panic_allocator, sysvars.LastRestartSlot, &sysvars.IDS.last_restart_slot); - // _ = try accounts_db.getTypeFromAccount(panic_allocator, sysvars.EpochRewards, &sysvars.IDS.epoch_rewards); + // _ = try accounts_db.getTypeFromAccount(allocator, sysvars.LastRestartSlot, &sysvars.IDS.last_restart_slot); + // _ = try accounts_db.getTypeFromAccount(allocator, sysvars.EpochRewards, &sysvars.IDS.epoch_rewards); } test "flushing slots works" { const allocator = std.testing.allocator; const logger: Logger = .noop; + var bp = try BufferPool.init(allocator, 100); + defer bp.deinit(allocator); + var tmp_dir_root = std.testing.tmpDir(.{}); defer tmp_dir_root.cleanup(); const snapshot_dir = tmp_dir_root.dir; @@ -3758,7 +3855,10 @@ test "flushing slots works" { const file_id = file_map.keys()[0]; const account_file = file_map.getPtr(file_id).?; - account_file.number_of_accounts = try account_file.validate(); + account_file.number_of_accounts = try account_file.validate( + allocator, + &bp, + ); try std.testing.expect(account_file.number_of_accounts == n_accounts); try std.testing.expect(unclean_account_files.items.len == 1); @@ -4188,7 +4288,7 @@ test "shrink account file works" { const account_file = file_map.get(file_id).?; if (account_file.slot == slot) break file_id; } else return error.NoSlotFile; - break :blk file_map.get(slot_file_id).?.memory.len; + break :blk file_map.get(slot_file_id).?.length; }; // full memory block @@ -4232,7 +4332,7 @@ test "shrink account file works" { }; const new_account_file = file_map2.get(new_slot_file_id).?; - const post_shrink_size = new_account_file.memory.len; + const post_shrink_size = new_account_file.length; try std.testing.expect(post_shrink_size < pre_shrink_size); } @@ -4777,7 +4877,10 @@ pub const BenchmarkAccountsDB = struct { }, .disk => { var account_files = try ArrayList(AccountFile).initCapacity(allocator, slot_list_len); - defer account_files.deinit(); + defer { + // don't deinit each account_file here - they are taken by putAccountFile + account_files.deinit(); + } for (0..(slot_list_len + bench_args.n_accounts_multiple)) |s| { var size: usize = 0; @@ -4796,7 +4899,7 @@ pub const BenchmarkAccountsDB = struct { var account_file = blk: { var file = try disk_dir.dir.createFile(filepath, .{ .read = true }); - defer file.close(); + errdefer file.close(); // resize the file const file_size = (try file.stat()).size; @@ -4812,7 +4915,7 @@ pub const BenchmarkAccountsDB = struct { var offset: usize = 0; for (0..n_accounts) |i| { const account = try Account.initRandom(allocator, random, i % 1_000); - defer allocator.free(account.data); + defer account.deinit(allocator); var pubkey = pubkeys[i % n_accounts]; offset += account.writeToBuf(&pubkey, buf[offset..]); } @@ -4821,7 +4924,6 @@ pub const BenchmarkAccountsDB = struct { break :blk try AccountFile.init(file, .{ .id = FileId.fromInt(@intCast(s)), .length = offset }, s); }; - errdefer account_file.deinit(); if (s < bench_args.n_accounts_multiple) { try accounts_db.putAccountFile(&account_file, n_accounts); @@ -4876,8 +4978,8 @@ pub const BenchmarkAccountsDB = struct { const pubkey_idx = indexer.sample(); const account = try accounts_db.getAccount(&pubkeys[pubkey_idx]); defer account.deinit(allocator); - if (account.data.len != (pubkey_idx % 1_000)) { - std.debug.panic("account data len dnm {}: {} != {}", .{ pubkey_idx, account.data.len, (pubkey_idx % 1_000) }); + if (account.data.len() != (pubkey_idx % 1_000)) { + std.debug.panic("account data len dnm {}: {} != {}", .{ pubkey_idx, account.data.len(), (pubkey_idx % 1_000) }); } } const read_time = timer.read(); diff --git a/src/accountsdb/fuzz.zig b/src/accountsdb/fuzz.zig index 44a48d139..18f431b4a 100644 --- a/src/accountsdb/fuzz.zig +++ b/src/accountsdb/fuzz.zig @@ -11,6 +11,7 @@ const Pubkey = sig.core.pubkey.Pubkey; const BankFields = sig.accounts_db.snapshots.BankFields; const FullSnapshotFileInfo = sig.accounts_db.snapshots.FullSnapshotFileInfo; const IncrementalSnapshotFileInfo = sig.accounts_db.snapshots.IncrementalSnapshotFileInfo; +const AccountDataHandle = sig.accounts_db.buffer_pool.AccountDataHandle; pub const TrackedAccount = struct { pubkey: Pubkey, @@ -30,7 +31,7 @@ pub const TrackedAccount = struct { pub fn toAccount(self: *const TrackedAccount, allocator: std.mem.Allocator) !Account { return .{ .lamports = 19, - .data = try allocator.dupe(u8, &self.data), + .data = AccountDataHandle.initAllocatedOwned(try allocator.dupe(u8, &self.data)), .owner = Pubkey.ZEROES, .executable = false, .rent_epoch = 0, @@ -218,7 +219,7 @@ pub fn run(seed: u64, args: *std.process.ArgIterator) !void { try accounts_db.getAccountAndReference(&tracked_account.pubkey); defer account.deinit(allocator); - if (!std.mem.eql(u8, &tracked_account.data, account.data)) { + if (!account.data.eqlSlice(&tracked_account.data)) { std.debug.panic( "found account {} with different data: " ++ "tracked: {any} vs found: {any} ({})\n", diff --git a/src/accountsdb/snapshots.zig b/src/accountsdb/snapshots.zig index 125869872..47a92ba1d 100644 --- a/src/accountsdb/snapshots.zig +++ b/src/accountsdb/snapshots.zig @@ -280,7 +280,7 @@ pub const VoteAccount = struct { vote_account: VoteAccount, allocator: std.mem.Allocator, ) std.mem.Allocator.Error!VoteAccount { - const account = try vote_account.account.clone(allocator); + const account = try vote_account.account.cloneOwned(allocator); errdefer account.deinit(allocator); return .{ .account = account, @@ -297,10 +297,12 @@ pub const VoteAccount = struct { .resize = .assert, .free = .assert, }); - const vote_state = bincode.readFromSlice( + + var data_iter = self.account.data.iterator(); + const vote_state = bincode.read( assert_alloc, VoteState, - self.account.data, + data_iter.reader(), .{}, ); self.vote_state = vote_state; diff --git a/src/core/account.zig b/src/core/account.zig index 7ed1b1559..25f1bce8f 100644 --- a/src/core/account.zig +++ b/src/core/account.zig @@ -3,21 +3,25 @@ const sig = @import("../sig.zig"); const Pubkey = sig.core.Pubkey; const Epoch = sig.core.Epoch; const AccountInFile = sig.accounts_db.accounts_file.AccountInFile; +const AccountDataHandle = sig.accounts_db.buffer_pool.AccountDataHandle; pub const Account = struct { lamports: u64, - data: []u8, + data: AccountDataHandle, owner: Pubkey, executable: bool, rent_epoch: Epoch, pub fn deinit(self: Account, allocator: std.mem.Allocator) void { - allocator.free(self.data); + self.data.deinit(allocator); } pub fn initRandom(allocator: std.mem.Allocator, random: std.Random, data_len: usize) !Account { - const data = try allocator.alloc(u8, data_len); - random.bytes(data); + const data_buf = try allocator.alloc(u8, data_len); + errdefer allocator.free(data_buf); + + random.bytes(data_buf); + const data = AccountDataHandle.initAllocatedOwned(data_buf); return .{ .lamports = random.int(u64), @@ -29,11 +33,21 @@ pub const Account = struct { } // creates a copy of the account. most important is the copy of the data slice. - pub fn clone(self: *const Account, allocator: std.mem.Allocator) !Account { - const data = try allocator.dupe(u8, self.data); + pub fn cloneOwned(self: *const Account, allocator: std.mem.Allocator) !Account { return .{ .lamports = self.lamports, - .data = data, + .data = try self.data.dupeAllocatedOwned(allocator), + .owner = self.owner, + .executable = self.executable, + .rent_epoch = self.rent_epoch, + }; + } + + // creates a cheap borrow of an already-cached account + pub fn cloneCached(self: *const Account, allocator: std.mem.Allocator) !Account { + return .{ + .lamports = self.lamports, + .data = try self.data.duplicateBufferPoolRead(allocator), .owner = self.owner, .executable = self.executable, .rent_epoch = self.rent_epoch, @@ -41,7 +55,7 @@ pub const Account = struct { } pub fn equals(self: *const Account, other: *const Account) bool { - return std.mem.eql(u8, self.data, other.data) and + return self.data.eql(other.data) and self.lamports == other.lamports and self.owner.equals(&other.owner) and self.executable == other.executable and @@ -52,16 +66,17 @@ pub const Account = struct { pub fn getSizeInFile(self: *const Account) usize { return std.mem.alignForward( usize, - AccountInFile.STATIC_SIZE + self.data.len, + AccountInFile.STATIC_SIZE + self.data.len(), @sizeOf(u64), ); } /// computes the hash of the account pub fn hash(self: *const Account, pubkey: *const Pubkey) Hash { + var iter = self.data.iterator(); return hashAccount( self.lamports, - self.data, + &iter, &self.owner.data, self.executable, self.rent_epoch, @@ -75,7 +90,7 @@ pub const Account = struct { const storage_info = AccountInFile.StorageInfo{ .write_version_obsolete = 0, - .data_len = self.data.len, + .data_len = self.data.len(), .pubkey = pubkey.*, }; offset += storage_info.writeToBuf(buf[offset..]); @@ -93,8 +108,9 @@ pub const Account = struct { offset += 32; offset = std.mem.alignForward(usize, offset, @sizeOf(u64)); - @memcpy(buf[offset..(offset + self.data.len)], self.data); - offset += self.data.len; + self.data.readAll(buf[offset..][0..self.data.len()]); + + offset += self.data.len(); offset = std.mem.alignForward(usize, offset, @sizeOf(u64)); return offset; @@ -118,7 +134,7 @@ const Hash = @import("hash.zig").Hash; pub fn hashAccount( lamports: u64, - data: []const u8, + data: *AccountDataHandle.Iterator, owner_pubkey_data: []const u8, executable: bool, rent_epoch: u64, @@ -134,7 +150,9 @@ pub fn hashAccount( std.mem.writeInt(u64, &int_buf, rent_epoch, .little); hasher.update(&int_buf); - hasher.update(data); + while (data.nextFrame()) |frame_slice| { + hasher.update(frame_slice); + } if (executable) { hasher.update(&[_]u8{1}); @@ -157,16 +175,17 @@ test "core.account: test account hash matches rust" { var data: [3]u8 = .{ 1, 2, 3 }; var account: Account = .{ .lamports = 10, - .data = &data, + .data = AccountDataHandle.initAllocated(&data), .owner = Pubkey.ZEROES, .executable = false, .rent_epoch = 20, }; const pubkey = Pubkey.ZEROES; + var iter = account.data.iterator(); const hash = hashAccount( account.lamports, - account.data, + &iter, &account.owner.data, account.executable, account.rent_epoch, diff --git a/src/geyser/main.zig b/src/geyser/main.zig index 7ca140de0..39d67b9be 100644 --- a/src/geyser/main.zig +++ b/src/geyser/main.zig @@ -264,7 +264,7 @@ pub fn csvDump() !void { continue; } } - fmt_count += 120 + 5 * account.data.len; + fmt_count += 120 + 5 * account.data.len(); } const csv_string = try recycle_fba.allocator().alloc(u8, fmt_count);