Skip to content

Commit

Permalink
Fix stats concurrency issues
Browse files Browse the repository at this point in the history
  • Loading branch information
BrianBland committed Aug 5, 2024
1 parent f7a57d1 commit 5c54f22
Showing 1 changed file with 25 additions and 12 deletions.
37 changes: 25 additions & 12 deletions packages/replayor/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ func (r *Benchmark) addBlock(ctx context.Context, currentBlock strategies.BlockC

if r.remainingBlockCount == 0 {
r.log.Info("finished processing blocks")
r.recordStats.Close()
return
}

Expand Down Expand Up @@ -250,15 +251,23 @@ func (r *Benchmark) enrich(ctx context.Context, s *stats.BlockCreationStats) {
}
}

func (r *Benchmark) enrichAndRecordStats(ctx context.Context) {
func (r *Benchmark) enrichAndRecordStats(ctx context.Context) chan any {
var wg sync.WaitGroup
wg.Add(5)
for i := 0; i < 5; i++ {
go func() {
defer wg.Done()
for {
select {
case stats := <-r.recordStats.Out():
case stats, ok := <-r.recordStats.Out():
if !ok {
return
}
r.enrich(ctx, &stats)

r.sm.Lock()
r.s.RecordBlockStats(stats)
r.sm.Unlock()

r.log.Debug("block stats", "BlockNumber", stats.BlockNumber, "BlockHash", stats.BlockHash, "TxnCount", stats.TxnCount, "TotalTime", stats.TotalTime, "FCUTime", stats.FCUTime, "GetTime", stats.GetTime, "NewTime", stats.NewTime, "FCUNoAttrsTime", stats.FCUNoAttrsTime, "Success", stats.Success, "GasUsed", stats.GasUsed, "GasLimit", stats.GasLimit)
case <-ctx.Done():
Expand All @@ -267,6 +276,15 @@ func (r *Benchmark) enrichAndRecordStats(ctx context.Context) {
}
}()
}

doneChan := make(chan any)

go func() {
wg.Wait()
close(doneChan)
}()

return doneChan
}

func (r *Benchmark) submitBlocks(ctx context.Context) {
Expand Down Expand Up @@ -311,7 +329,7 @@ func (r *Benchmark) Run(ctx context.Context) {
go r.loadBlocks(ctx)
go r.mapBlocks(ctx)
go r.submitBlocks(ctx)
r.enrichAndRecordStats(ctx)
doneChan := r.enrichAndRecordStats(ctx)

ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
Expand All @@ -322,6 +340,10 @@ func (r *Benchmark) Run(ctx context.Context) {

for {
select {
case <-doneChan:
r.log.Info("writing block stats")
r.s.Write(ctx)
return
case <-ticker.C:
currentBlock, err := r.clients.DestNode.BlockByNumber(ctx, nil)
if err != nil {
Expand All @@ -331,15 +353,6 @@ func (r *Benchmark) Run(ctx context.Context) {
l.Info("replay progress", "blocks", currentBlock.NumberU64()-lastBlockNum, "incomingBlocks", len(r.incomingBlocks), "processBlocks", len(r.processBlocks), "currentBlock", currentBlock.NumberU64(), "statProgress", r.recordStats.Len(), "remaining", r.remainingBlockCount)

lastBlockNum = currentBlock.NumberU64()

r.sm.Lock()
rc := r.remainingBlockCount
r.sm.Unlock()

if rc == 0 {
r.s.Write(ctx)
return
}
case <-ctx.Done():
return
}
Expand Down

0 comments on commit 5c54f22

Please sign in to comment.