Skip to content

Commit

Permalink
Enrich stats immediately after each block
Browse files Browse the repository at this point in the history
  • Loading branch information
BrianBland committed Aug 9, 2024
1 parent c44d257 commit 5ba0cf0
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 38 deletions.
45 changes: 9 additions & 36 deletions packages/replayor/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package replayor

import (
"context"
"fmt"
"math/big"
"sync"
"time"
Expand Down Expand Up @@ -33,7 +34,6 @@ type Benchmark struct {

incomingBlocks chan *types.Block
processBlocks chan strategies.BlockCreationParams
recordStats chan stats.BlockCreationStats

previousReplayedBlockHash common.Hash
strategy strategies.Strategy
Expand All @@ -45,6 +45,7 @@ type Benchmark struct {
endBlockNum uint64

benchmarkOpcodes bool
diffStorage bool
}

func (r *Benchmark) getBlockFromSourceNode(ctx context.Context, blockNum uint64) (*types.Block, error) {
Expand Down Expand Up @@ -74,7 +75,7 @@ func (r *Benchmark) loadBlocks(ctx context.Context) {
}

if block == nil {
panic(err)
panic(fmt.Errorf("unexpected nil block: %d", blockNum))
}

m.Lock()
Expand Down Expand Up @@ -213,7 +214,8 @@ func (r *Benchmark) addBlock(ctx context.Context, currentBlock strategies.BlockC

r.previousReplayedBlockHash = envelope.ExecutionPayload.BlockHash

r.recordStats <- stats
r.enrich(ctx, &stats)
r.s.RecordBlockStats(stats)
}

func (r *Benchmark) enrich(ctx context.Context, s *stats.BlockCreationStats) {
Expand All @@ -237,41 +239,12 @@ func (r *Benchmark) enrich(ctx context.Context, s *stats.BlockCreationStats) {
r.computeTraceStats(ctx, s, receipts)
}

func (r *Benchmark) enrichAndRecordStats(ctx context.Context) {
var wg sync.WaitGroup
wg.Add(5)
for i := 0; i < 5; i++ {
go func() {
defer wg.Done()
for {
select {
case stats, ok := <-r.recordStats:
if !ok {
return
}
r.enrich(ctx, &stats)

r.sm.Lock()
r.s.RecordBlockStats(stats)
r.sm.Unlock()

r.log.Debug("block stats", "BlockNumber", stats.BlockNumber, "BlockHash", stats.BlockHash, "TxnCount", stats.TxnCount, "TotalTime", stats.TotalTime, "FCUTime", stats.FCUTime, "GetTime", stats.GetTime, "NewTime", stats.NewTime, "FCUNoAttrsTime", stats.FCUNoAttrsTime, "Success", stats.Success, "GasUsed", stats.GasUsed, "GasLimit", stats.GasLimit)
case <-ctx.Done():
return
}
}
}()
}
wg.Wait()
}

func (r *Benchmark) submitBlocks(ctx context.Context) {
for {
select {
case block, ok := <-r.processBlocks:
if block.Number > r.endBlockNum || !ok {
r.log.Info("stopping block processing")
close(r.recordStats)
return
}

Expand Down Expand Up @@ -315,7 +288,6 @@ func (r *Benchmark) Run(ctx context.Context) {
go r.mapBlocks(ctx)
go func() {
r.submitBlocks(ctx)
r.enrichAndRecordStats(ctx)
close(doneChan)
}()

Expand All @@ -338,7 +310,7 @@ func (r *Benchmark) Run(ctx context.Context) {
l.Error("unable to load current block", "err", err)
}

l.Info("replay progress", "blocks", currentBlock.NumberU64()-lastBlockNum, "incomingBlocks", len(r.incomingBlocks), "processBlocks", len(r.processBlocks), "currentBlock", currentBlock.NumberU64(), "statProgress", len(r.recordStats), "remaining", r.remainingBlockCount)
l.Info("replay progress", "blocks", currentBlock.NumberU64()-lastBlockNum, "incomingBlocks", len(r.incomingBlocks), "processBlocks", len(r.processBlocks), "currentBlock", currentBlock.NumberU64(), "remaining", r.remainingBlockCount)

lastBlockNum = currentBlock.NumberU64()
case <-ctx.Done():
Expand All @@ -357,14 +329,14 @@ func NewBenchmark(
s stats.Stats,
currentBlock *types.Block,
benchmarkBlockCount uint64,
benchmarkOpcodes bool) *Benchmark {
benchmarkOpcodes bool,
diffStorage bool) *Benchmark {
r := &Benchmark{
clients: c,
rollupCfg: rollupCfg,
log: logger,
incomingBlocks: make(chan *types.Block, 25),
processBlocks: make(chan strategies.BlockCreationParams, 25),
recordStats: make(chan stats.BlockCreationStats, benchmarkBlockCount),
strategy: strategy,
s: s,
currentBlock: currentBlock,
Expand All @@ -373,6 +345,7 @@ func NewBenchmark(
remainingBlockCount: benchmarkBlockCount,
previousReplayedBlockHash: currentBlock.Hash(),
benchmarkOpcodes: benchmarkOpcodes,
diffStorage: diffStorage,
}

return r
Expand Down
3 changes: 2 additions & 1 deletion packages/replayor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func (r *Service) Start(ctx context.Context) error {
&stats.NoOpStats{},
currentBlock,
r.cfg.BenchmarkStartBlock-currentBlock.NumberU64(),
false,
false)

walkUpToBlock.Run(cCtx)
Expand All @@ -94,7 +95,7 @@ func (r *Service) Start(ctx context.Context) error {
panic(err)
}

benchmark := NewBenchmark(r.clients, r.cfg.RollupConfig, r.log, strategy, r.stats, currentBlock, uint64(r.cfg.BlockCount), r.cfg.BenchmarkOpcodes)
benchmark := NewBenchmark(r.clients, r.cfg.RollupConfig, r.log, strategy, r.stats, currentBlock, uint64(r.cfg.BlockCount), r.cfg.BenchmarkOpcodes, true)
benchmark.Run(cCtx)

return nil
Expand Down
5 changes: 4 additions & 1 deletion packages/replayor/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ func (r *Benchmark) computeTraceStats(ctx context.Context, s *stats.BlockCreatio
r.traceReceipt(ctx, receipt, s.OpCodes)
}
}
r.recordStorageChanges(ctx, s)

if r.diffStorage {
r.recordStorageChanges(ctx, s)
}
}

func (r *Benchmark) traceReceipt(ctx context.Context, receipt *types.Receipt, opCodes map[string]stats.OpCodeStats) {
Expand Down

0 comments on commit 5ba0cf0

Please sign in to comment.