Skip to content

Commit

Permalink
triedb/pathdb, eth: introduce Double-Buffer Mechanism in PathDB
Browse files Browse the repository at this point in the history
Previously, PathDB used a single buffer to aggregate database writes,
which needed to be flushed atomically. However, flushing large amounts
of data (e.g., 256MB) caused significant overhead, often blocking
the system for around 3 seconds during the flush.

To mitigate this overhead and reduce performance spikes, a double-buffer
mechanism is introduced. When the active buffer fills up, it is marked
as frozen and a background flushing process is triggered. Meanwhile, a
new buffer is allocated for incoming writes, allowing operations to
continue uninterrupted.

This approach reduces system blocking times and provides flexibility
in adjusting buffer parameters for improved performance.
  • Loading branch information
rjl493456442 committed Sep 19, 2024
1 parent 8032b63 commit 569b961
Show file tree
Hide file tree
Showing 9 changed files with 127 additions and 105 deletions.
5 changes: 0 additions & 5 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/forkid"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
Expand All @@ -41,7 +40,6 @@ import (
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/triedb/pathdb"
)

const (
Expand Down Expand Up @@ -558,7 +556,4 @@ func (h *handler) enableSyncedFeatures() {
log.Info("Snap sync complete, auto disabling")
h.snapSync.Store(false)
}
if h.chain.TrieDB().Scheme() == rawdb.PathScheme {
h.chain.TrieDB().SetBufferSize(pathdb.DefaultBufferSize)
}
}
11 changes: 0 additions & 11 deletions triedb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,17 +314,6 @@ func (db *Database) Journal(root common.Hash) error {
return pdb.Journal(root)
}

// SetBufferSize sets the node buffer size to the provided value(in bytes).
// It's only supported by path-based database and will return an error for
// others.
func (db *Database) SetBufferSize(size int) error {
pdb, ok := db.backend.(*pathdb.Database)
if !ok {
return errors.New("not supported")
}
return pdb.SetBufferSize(size)
}

// IsVerkle returns the indicator if the database is holding a verkle tree.
func (db *Database) IsVerkle() bool {
return db.config.IsVerkle
Expand Down
29 changes: 11 additions & 18 deletions triedb/pathdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,12 @@ const (
// support is 4GB, node will panic if batch size exceeds this limit.
maxBufferSize = 256 * 1024 * 1024

// DefaultBufferSize is the default memory allowance of node buffer
// defaultBufferSize is the default memory allowance of node buffer
// that aggregates the writes from above until it's flushed into the
// disk. It's meant to be used once the initial sync is finished.
// Do not increase the buffer size arbitrarily, otherwise the system
// pause time will increase when the database writes happen.
DefaultBufferSize = 64 * 1024 * 1024
defaultBufferSize = 64 * 1024 * 1024
)

var (
Expand Down Expand Up @@ -111,7 +111,7 @@ func (c *Config) sanitize() *Config {
var Defaults = &Config{
StateHistory: params.FullImmutabilityThreshold,
CleanCacheSize: defaultCleanSize,
DirtyCacheSize: DefaultBufferSize,
DirtyCacheSize: defaultBufferSize,
}

// ReadOnly is the config in order to open database in read only mode.
Expand Down Expand Up @@ -341,7 +341,7 @@ func (db *Database) Enable(root common.Hash) error {
}
// Re-construct a new disk layer backed by persistent state
// with **empty clean cache and node buffer**.
db.tree.reset(newDiskLayer(root, 0, db, nil, newNodeBuffer(db.bufferSize, nil, 0)))
db.tree.reset(newDiskLayer(root, 0, db, nil, newNodeBuffer(db.bufferSize, nil, 0), nil))

// Re-enable the database as the final step.
db.waitSync = false
Expand Down Expand Up @@ -440,7 +440,13 @@ func (db *Database) Close() error {
db.readOnly = true

// Release the memory held by clean cache.
db.tree.bottom().resetCache()
disk := db.tree.bottom()
if disk.frozen != nil {
if err := disk.frozen.flushed(); err != nil {
return err
}
}
disk.resetCache()

// Close the attached state history freezer.
if db.freezer == nil {
Expand Down Expand Up @@ -478,19 +484,6 @@ func (db *Database) Initialized(genesisRoot common.Hash) bool {
return inited
}

// SetBufferSize sets the node buffer size to the provided value(in bytes).
func (db *Database) SetBufferSize(size int) error {
db.lock.Lock()
defer db.lock.Unlock()

if size > maxBufferSize {
log.Info("Capped node buffer size", "provided", common.StorageSize(size), "adjusted", common.StorageSize(maxBufferSize))
size = maxBufferSize
}
db.bufferSize = size
return db.tree.bottom().setBufferSize(db.bufferSize)
}

// modifyAllowed returns the indicator if mutation is allowed. This function
// assumes the db.lock is already held.
func (db *Database) modifyAllowed() error {
Expand Down
4 changes: 2 additions & 2 deletions triedb/pathdb/difflayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (dl *diffLayer) update(root common.Hash, id uint64, block uint64, nodes map
}

// persist flushes the diff layer and all its parent layers to disk layer.
func (dl *diffLayer) persist(force bool) (layer, error) {
func (dl *diffLayer) persist(force bool) (*diskLayer, error) {
if parent, ok := dl.parentLayer().(*diffLayer); ok {
// Hold the lock to prevent any read operation until the new
// parent is linked correctly.
Expand All @@ -147,7 +147,7 @@ func (dl *diffLayer) persist(force bool) (layer, error) {

// diffToDisk merges a bottom-most diff into the persistent disk layer underneath
// it. The method will panic if called onto a non-bottom-most diff layer.
func diffToDisk(layer *diffLayer, force bool) (layer, error) {
func diffToDisk(layer *diffLayer, force bool) (*diskLayer, error) {
disk, ok := layer.parentLayer().(*diskLayer)
if !ok {
panic(fmt.Sprintf("unknown layer type: %T", layer.parentLayer()))
Expand Down
2 changes: 1 addition & 1 deletion triedb/pathdb/difflayer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
func emptyLayer() *diskLayer {
return &diskLayer{
db: New(rawdb.NewMemoryDatabase(), nil, false),
buffer: newNodeBuffer(DefaultBufferSize, nil, 0),
buffer: newNodeBuffer(defaultBufferSize, nil, 0),
}
}

Expand Down
88 changes: 55 additions & 33 deletions triedb/pathdb/disklayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ type diskLayer struct {
db *Database // Path-based trie database
cleans *fastcache.Cache // GC friendly memory cache of clean node RLPs
buffer *nodebuffer // Node buffer to aggregate writes
frozen *nodebuffer // Frozen node buffer waiting for flushing
stale bool // Signals that the layer became stale (state progressed)
lock sync.RWMutex // Lock used to protect stale flag
}

// newDiskLayer creates a new disk layer based on the passing arguments.
func newDiskLayer(root common.Hash, id uint64, db *Database, cleans *fastcache.Cache, buffer *nodebuffer) *diskLayer {
func newDiskLayer(root common.Hash, id uint64, db *Database, cleans *fastcache.Cache, buffer *nodebuffer, frozen *nodebuffer) *diskLayer {
// Initialize a clean cache if the memory allowance is not zero
// or reuse the provided cache if it is not nil (inherited from
// the original disk layer).
Expand All @@ -54,6 +55,7 @@ func newDiskLayer(root common.Hash, id uint64, db *Database, cleans *fastcache.C
db: db,
cleans: cleans,
buffer: buffer,
frozen: frozen,
}
}

Expand Down Expand Up @@ -102,16 +104,19 @@ func (dl *diskLayer) node(owner common.Hash, path []byte, depth int) ([]byte, co
if dl.stale {
return nil, common.Hash{}, nil, errSnapshotStale
}
// Try to retrieve the trie node from the not-yet-written
// node buffer first. Note the buffer is lock free since
// it's impossible to mutate the buffer before tagging the
// layer as stale.
n, found := dl.buffer.node(owner, path)
if found {
dirtyHitMeter.Mark(1)
dirtyReadMeter.Mark(int64(len(n.Blob)))
dirtyNodeHitDepthHist.Update(int64(depth))
return n.Blob, n.Hash, &nodeLoc{loc: locDirtyCache, depth: depth}, nil
// Try to retrieve the trie node from the not-yet-written node buffer first
// (both the live one and the frozen one). Note the buffer is lock free since
// it's impossible to mutate the buffer before tagging the layer as stale.
for _, buffer := range []*nodebuffer{dl.buffer, dl.frozen} {
if buffer != nil {
n, found := buffer.node(owner, path)
if found {
dirtyHitMeter.Mark(1)
dirtyReadMeter.Mark(int64(len(n.Blob)))
dirtyNodeHitDepthHist.Update(int64(depth))
return n.Blob, n.Hash, &nodeLoc{loc: locDirtyCache, depth: depth}, nil
}
}
}
dirtyMissMeter.Mark(1)

Expand Down Expand Up @@ -182,29 +187,51 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) {
// Mark the diskLayer as stale before applying any mutations on top.
dl.stale = true

// Store the root->id lookup afterwards. All stored lookups are identified
// Store the root->id lookup afterward. All stored lookups are identified
// by the **unique** state root. It's impossible that in the same chain
// blocks are not adjacent but have the same root.
if dl.id == 0 {
rawdb.WriteStateID(dl.db.diskdb, dl.root, 0)
}
rawdb.WriteStateID(dl.db.diskdb, bottom.rootHash(), bottom.stateID())

// Construct a new disk layer by merging the nodes from the provided diff
// layer, and flush the content in disk layer if there are too many nodes
// cached. The clean cache is inherited from the original disk layer.
ndl := newDiskLayer(bottom.root, bottom.stateID(), dl.db, dl.cleans, dl.buffer.commit(bottom.nodes))

// In a unique scenario where the ID of the oldest history object (after tail
// truncation) surpasses the persisted state ID, we take the necessary action
// of forcibly committing the cached dirty nodes to ensure that the persisted
// of forcibly committing the cached dirty states to ensure that the persisted
// state ID remains higher.
if !force && rawdb.ReadPersistentStateID(dl.db.diskdb) < oldest {
persistedID := rawdb.ReadPersistentStateID(dl.db.diskdb)
if !force && persistedID < oldest {
force = true
}
if err := ndl.buffer.flush(ndl.db.diskdb, ndl.cleans, ndl.id, force); err != nil {
return nil, err
// Merge the nodes of the bottom-most diff layer into the buffer as the combined one
combined := dl.buffer.commit(bottom.nodes)
if combined.full() || force {
// Wait until the previous frozen buffer is fully flushed
if dl.frozen != nil {
if err := dl.frozen.flushed(); err != nil {
return nil, err
}
}
dl.frozen = nil

// Freeze the live buffer and schedule background flushing
dl.frozen = combined
dl.frozen.flush(dl.db.diskdb, dl.cleans, bottom.stateID())

// Block until the frozen buffer is fully flushed out if the oldest history
// surpasses the persisted state ID.
if persistedID < oldest {
if err := dl.frozen.flushed(); err != nil {
return nil, err
}
}
combined = newNodeBuffer(dl.db.bufferSize, nil, 0)
}
// Construct a new disk layer by merging the nodes from the provided diff
// layer, and flush the content in disk layer if there are too many nodes
// cached. The clean cache is inherited from the original disk layer.
ndl := newDiskLayer(bottom.root, bottom.stateID(), dl.db, dl.cleans, combined, dl.frozen)

// To remove outdated history objects from the end, we set the 'tail' parameter
// to 'oldest-1' due to the offset between the freezer index and the history ID.
if overflow {
Expand Down Expand Up @@ -249,25 +276,20 @@ func (dl *diskLayer) revert(h *history) (*diskLayer, error) {
return nil, err
}
} else {
// Block until the frozen buffer is fully flushed
if dl.frozen != nil {
if err := dl.frozen.flushed(); err != nil {
return nil, err
}
}
batch := dl.db.diskdb.NewBatch()
writeNodes(batch, nodes, dl.cleans)
rawdb.WritePersistentStateID(batch, dl.id-1)
if err := batch.Write(); err != nil {
log.Crit("Failed to write states", "err", err)
}
}
return newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.cleans, dl.buffer), nil
}

// setBufferSize sets the node buffer size to the provided value.
func (dl *diskLayer) setBufferSize(size int) error {
dl.lock.RLock()
defer dl.lock.RUnlock()

if dl.stale {
return errSnapshotStale
}
return dl.buffer.setSize(size, dl.db.diskdb, dl.cleans, dl.id)
return newDiskLayer(h.meta.parent, dl.id-1, dl.db, dl.cleans, dl.buffer, dl.frozen), nil
}

// size returns the approximate size of cached nodes in the disk layer.
Expand Down
9 changes: 7 additions & 2 deletions triedb/pathdb/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (db *Database) loadLayers() layer {
log.Info("Failed to load journal, discard it", "err", err)
}
// Return single layer with persistent state.
return newDiskLayer(root, rawdb.ReadPersistentStateID(db.diskdb), db, nil, newNodeBuffer(db.bufferSize, nil, 0))
return newDiskLayer(root, rawdb.ReadPersistentStateID(db.diskdb), db, nil, newNodeBuffer(db.bufferSize, nil, 0), nil)
}

// loadDiskLayer reads the binary blob from the layer journal, reconstructing
Expand Down Expand Up @@ -176,7 +176,7 @@ func (db *Database) loadDiskLayer(r *rlp.Stream) (layer, error) {
nodes[entry.Owner] = subset
}
// Calculate the internal state transitions by id difference.
base := newDiskLayer(root, id, db, nil, newNodeBuffer(db.bufferSize, nodes, id-stored))
base := newDiskLayer(root, id, db, nil, newNodeBuffer(db.bufferSize, nodes, id-stored), nil)
return base, nil
}

Expand Down Expand Up @@ -342,6 +342,11 @@ func (db *Database) Journal(root common.Hash) error {
return fmt.Errorf("triedb layer [%#x] missing", root)
}
disk := db.tree.bottom()
if disk.frozen != nil {
if err := disk.frozen.flushed(); err != nil {
return err
}
}
if l, ok := l.(*diffLayer); ok {
log.Info("Persisting dirty state to disk", "head", l.block, "root", root, "layers", l.id-disk.id+disk.buffer.layers)
} else { // disk layer only on noop runs (likely) or deep reorgs (unlikely)
Expand Down
6 changes: 6 additions & 0 deletions triedb/pathdb/layertree.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ func (tree *layerTree) cap(root common.Hash, layers int) error {
if err != nil {
return err
}
// Block until the frozen buffer is fully flushed
if base.frozen != nil {
if err := base.frozen.flushed(); err != nil {
return err
}
}
// Replace the entire layer tree with the flat base
tree.layers = map[common.Hash]layer{base.rootHash(): base}
return nil
Expand Down
Loading

0 comments on commit 569b961

Please sign in to comment.