From e5bbb927e00eac66b0ec2099a6d8c2f10497eee5 Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Mon, 22 Jul 2024 19:10:24 -0700 Subject: [PATCH 1/8] Wait for engine sync on startup --- packages/replayor/service.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/packages/replayor/service.go b/packages/replayor/service.go index 019ba43..cb3cb7e 100644 --- a/packages/replayor/service.go +++ b/packages/replayor/service.go @@ -4,11 +4,14 @@ import ( "context" "errors" "sync/atomic" + "time" "github.com/danyalprout/replayor/packages/clients" "github.com/danyalprout/replayor/packages/config" "github.com/danyalprout/replayor/packages/stats" "github.com/danyalprout/replayor/packages/strategies" + "github.com/ethereum-optimism/optimism/op-service/eth" + "github.com/ethereum-optimism/optimism/op-service/retry" "github.com/ethereum/go-ethereum/log" ) @@ -41,6 +44,22 @@ func (r *Service) Start(ctx context.Context) error { panic(err) } + retry.Do(ctx, 720, retry.Fixed(10*time.Second), func() (bool, error) { + result, err := r.clients.EngineApi.ForkchoiceUpdate(ctx, ð.ForkchoiceState{ + HeadBlockHash: currentBlock.Hash(), + SafeBlockHash: currentBlock.Hash(), + FinalizedBlockHash: currentBlock.Hash(), + }, nil) + if err != nil { + r.log.Info("waiting for engine API to stop syncing", "err", err) + return false, err + } else if result.PayloadStatus.Status != eth.ExecutionValid { + r.log.Info("waiting for execution API to stop syncing", "status", result.PayloadStatus.Status) + return false, errors.New("syncing") + } + return true, nil + }) + if r.cfg.BenchmarkStartBlock != 0 { if currentBlock.NumberU64() < r.cfg.BenchmarkStartBlock { walkUpToBlock := NewBenchmark( From 9c66f220a72161f89ae51da873e6f0052f27f78f Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Thu, 25 Jul 2024 09:44:18 -0400 Subject: [PATCH 2/8] Improve tracer perf, resilience --- packages/replayor/benchmark.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/packages/replayor/benchmark.go b/packages/replayor/benchmark.go index fefdcdd..46ba61f 100644 --- a/packages/replayor/benchmark.go +++ b/packages/replayor/benchmark.go @@ -246,11 +246,15 @@ func (r *Benchmark) enrich(ctx context.Context, s *stats.BlockCreationStats) { if r.benchmarkOpcodes { s.OpCodes = make(map[string]stats.OpCodeStats) + tracerOptions := map[string]any{ + "disableStack": true, + "disableStorage": true, + } for _, receipt := range receipts { txTrace, err := retry.Do(ctx, 10, retry.Exponential(), func() (*TxTrace, error) { var txTrace TxTrace - err := r.clients.DestNode.Client().Call(&txTrace, "debug_traceTransaction", receipt.TxHash, map[string]string{}) + err := r.clients.DestNode.Client().Call(&txTrace, "debug_traceTransaction", receipt.TxHash, tracerOptions) if err != nil { return nil, err } @@ -258,6 +262,11 @@ func (r *Benchmark) enrich(ctx context.Context, s *stats.BlockCreationStats) { }) if err != nil { r.log.Warn("unable to load tx trace", "err", err) + s.OpCodes["UNKNOWN"] = stats.OpCodeStats{ + Count: s.OpCodes["UNKNOWN"].Count + 1, + Gas: s.OpCodes["UNKNOWN"].Gas + receipt.GasUsed, + } + continue } var prevOpCode string From 0c929cce1cb62f87ddf65503beb461546ae6fc51 Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Mon, 29 Jul 2024 13:29:11 -0700 Subject: [PATCH 3/8] Improve tracing correctness --- packages/replayor/benchmark.go | 62 +--------------------- packages/replayor/trace.go | 94 ++++++++++++++++++++++++++++++---- 2 files changed, 87 insertions(+), 69 deletions(-) diff --git a/packages/replayor/benchmark.go b/packages/replayor/benchmark.go index 46ba61f..d440dc2 100644 --- a/packages/replayor/benchmark.go +++ b/packages/replayor/benchmark.go @@ -240,71 +240,13 @@ func (r *Benchmark) enrich(ctx context.Context, s *stats.BlockCreationStats) { for _, receipt := range receipts { if receipt.Status == types.ReceiptStatusSuccessful { success += 1 + s.GasUsed += receipt.GasUsed } } s.Success = float64(success) / float64(len(receipts)) if r.benchmarkOpcodes { - s.OpCodes = make(map[string]stats.OpCodeStats) - tracerOptions := map[string]any{ - "disableStack": true, - "disableStorage": true, - } - - for _, receipt := range receipts { - txTrace, err := retry.Do(ctx, 10, retry.Exponential(), func() (*TxTrace, error) { - var txTrace TxTrace - err := r.clients.DestNode.Client().Call(&txTrace, "debug_traceTransaction", receipt.TxHash, tracerOptions) - if err != nil { - return nil, err - } - return &txTrace, nil - }) - if err != nil { - r.log.Warn("unable to load tx trace", "err", err) - s.OpCodes["UNKNOWN"] = stats.OpCodeStats{ - Count: s.OpCodes["UNKNOWN"].Count + 1, - Gas: s.OpCodes["UNKNOWN"].Gas + receipt.GasUsed, - } - continue - } - - var prevOpCode string - prevGas := txTrace.Gas - var gasUsage uint64 - callStack := make([]string, 0, 1024) - for _, log := range txTrace.StructLogs { - prevDepth := uint64(len(callStack)) - if log.Depth > prevDepth { - // track the caller opcode - callStack = append(callStack, prevOpCode) - } else if log.Depth < prevDepth { - // account for refunded call stack gas - caller := callStack[len(callStack)-1] - callStack = callStack[:len(callStack)-1] - s.OpCodes[caller] = stats.OpCodeStats{ - Count: s.OpCodes[caller].Count + 1, - Gas: s.OpCodes[caller].Gas + log.Gas - prevGas, - } - gasUsage += log.Gas - prevGas - } else { - s.OpCodes[prevOpCode] = stats.OpCodeStats{ - Count: s.OpCodes[prevOpCode].Count + 1, - Gas: s.OpCodes[prevOpCode].Gas + prevGas - log.Gas, - } - gasUsage += prevGas - log.Gas - } - prevOpCode = log.Op - prevGas = log.Gas - } - - s.OpCodes["TRANSACTION"] = stats.OpCodeStats{ - Count: s.OpCodes["TRANSACTION"].Count + 1, - Gas: s.OpCodes["TRANSACTION"].Gas + receipt.GasUsed - gasUsage, - } - - s.GasUsed += receipt.GasUsed - } + r.computeTraceStats(ctx, s, receipts) } } diff --git a/packages/replayor/trace.go b/packages/replayor/trace.go index 18dc8bd..5bc9f90 100644 --- a/packages/replayor/trace.go +++ b/packages/replayor/trace.go @@ -1,13 +1,89 @@ package replayor +import ( + "context" + + "github.com/danyalprout/replayor/packages/stats" + "github.com/ethereum-optimism/optimism/op-service/retry" + "github.com/ethereum/go-ethereum/core/types" +) + type TxTrace struct { - Gas uint64 `json:"gas"` - Failed bool `json:"failed"` - StructLogs []struct { - Op string `json:"op"` - Gas uint64 `json:"gas"` - GasCost uint64 `json:"gasCost"` - Refund uint64 `json:"refund"` - Depth uint64 `json:"depth"` - } `json:"structLogs"` + Gas uint64 `json:"gas"` + Failed bool `json:"failed"` + StructLogs []StructLog `json:"structLogs"` +} + +type StructLog struct { + Op string `json:"op"` + Gas uint64 `json:"gas"` + GasCost uint64 `json:"gasCost"` + Refund uint64 `json:"refund"` + Depth uint64 `json:"depth"` +} + +var tracerOptions = map[string]any{ + "disableStack": true, + "disableStorage": true, +} + +func (r *Benchmark) computeTraceStats(ctx context.Context, s *stats.BlockCreationStats, receipts []*types.Receipt) { + s.OpCodes = make(map[string]stats.OpCodeStats) + + for _, receipt := range receipts { + r.traceReceipt(ctx, receipt, s.OpCodes) + } +} + +func (r *Benchmark) traceReceipt(ctx context.Context, receipt *types.Receipt, opCodes map[string]stats.OpCodeStats) { + txTrace, err := retry.Do(ctx, 10, retry.Exponential(), func() (*TxTrace, error) { + var txTrace TxTrace + err := r.clients.DestNode.Client().Call(&txTrace, "debug_traceTransaction", receipt.TxHash, tracerOptions) + if err != nil { + return nil, err + } + return &txTrace, nil + }) + if err != nil { + r.log.Warn("unable to load tx trace", "err", err) + opCodes["UNKNOWN"] = stats.OpCodeStats{ + Count: opCodes["UNKNOWN"].Count + 1, + Gas: opCodes["UNKNOWN"].Gas + receipt.GasUsed, + } + return + } + + var gasUsage uint64 + for idx, log := range txTrace.StructLogs { + opGas := log.GasCost + + var nextLog *StructLog + if idx < len(txTrace.StructLogs)-1 { + nextLog = &txTrace.StructLogs[idx+1] + } + if nextLog != nil && log.Depth < nextLog.Depth { + opGas = log.GasCost - nextLog.Gas + } + + opCodes[log.Op] = stats.OpCodeStats{ + Count: opCodes[log.Op].Count + 1, + Gas: opCodes[log.Op].Gas + opGas, + } + + gasUsage += opGas + } + + gasRefund := txTrace.StructLogs[len(txTrace.StructLogs)-1].Refund + + opCodes["REFUND"] = stats.OpCodeStats{ + Count: opCodes["REFUND"].Count + 1, + Gas: opCodes["REFUND"].Gas + gasRefund, + } + + gasUsage -= gasRefund + + opCodes["TRANSACTION"] = stats.OpCodeStats{ + Count: opCodes["TRANSACTION"].Count + 1, + Gas: opCodes["TRANSACTION"].Gas + receipt.GasUsed - gasUsage, + } } From a0a40c6cd9404e68486b788b51572113fa384b82 Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Mon, 29 Jul 2024 16:11:18 -0700 Subject: [PATCH 4/8] Fix handling of gas refunds --- packages/replayor/trace.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/replayor/trace.go b/packages/replayor/trace.go index 5bc9f90..04256d7 100644 --- a/packages/replayor/trace.go +++ b/packages/replayor/trace.go @@ -54,6 +54,7 @@ func (r *Benchmark) traceReceipt(ctx context.Context, receipt *types.Receipt, op } var gasUsage uint64 + var gasRefund uint64 for idx, log := range txTrace.StructLogs { opGas := log.GasCost @@ -71,10 +72,9 @@ func (r *Benchmark) traceReceipt(ctx context.Context, receipt *types.Receipt, op } gasUsage += opGas + gasRefund = log.Refund } - gasRefund := txTrace.StructLogs[len(txTrace.StructLogs)-1].Refund - opCodes["REFUND"] = stats.OpCodeStats{ Count: opCodes["REFUND"].Count + 1, Gas: opCodes["REFUND"].Gas + gasRefund, From 1ef17952f60f0aff712e02e41166afed7331f060 Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Mon, 29 Jul 2024 22:24:07 -0700 Subject: [PATCH 5/8] Fix static call gas --- packages/replayor/trace.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/packages/replayor/trace.go b/packages/replayor/trace.go index 04256d7..3c7806c 100644 --- a/packages/replayor/trace.go +++ b/packages/replayor/trace.go @@ -62,8 +62,12 @@ func (r *Benchmark) traceReceipt(ctx context.Context, receipt *types.Receipt, op if idx < len(txTrace.StructLogs)-1 { nextLog = &txTrace.StructLogs[idx+1] } - if nextLog != nil && log.Depth < nextLog.Depth { - opGas = log.GasCost - nextLog.Gas + if nextLog != nil { + if log.Depth < nextLog.Depth { + opGas = log.GasCost - nextLog.Gas + } else if log.Depth == nextLog.Depth { + opGas = log.Gas - nextLog.Gas + } } opCodes[log.Op] = stats.OpCodeStats{ From 3ec29f7d3e78105344da29614f50c2aa0b03bcc7 Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Tue, 30 Jul 2024 15:21:00 -0700 Subject: [PATCH 6/8] Improve intrinsic tx gas calc --- packages/replayor/trace.go | 36 +++++++++++++++++++++++++++--------- 1 file changed, 27 insertions(+), 9 deletions(-) diff --git a/packages/replayor/trace.go b/packages/replayor/trace.go index 3c7806c..6f273ac 100644 --- a/packages/replayor/trace.go +++ b/packages/replayor/trace.go @@ -36,6 +36,21 @@ func (r *Benchmark) computeTraceStats(ctx context.Context, s *stats.BlockCreatio } func (r *Benchmark) traceReceipt(ctx context.Context, receipt *types.Receipt, opCodes map[string]stats.OpCodeStats) { + tx, err := retry.Do(ctx, 10, retry.Exponential(), func() (*types.Transaction, error) { + tx, _, err := r.clients.DestNode.TransactionByHash(ctx, receipt.TxHash) + return tx, err + }) + if err != nil { + r.log.Warn("unable to load tx", "err", err) + opCodes["UNKNOWN"] = stats.OpCodeStats{ + Count: opCodes["UNKNOWN"].Count + 1, + Gas: opCodes["UNKNOWN"].Gas + receipt.GasUsed, + } + return + } + + gasLimit := tx.Gas() + txTrace, err := retry.Do(ctx, 10, retry.Exponential(), func() (*TxTrace, error) { var txTrace TxTrace err := r.clients.DestNode.Client().Call(&txTrace, "debug_traceTransaction", receipt.TxHash, tracerOptions) @@ -53,8 +68,19 @@ func (r *Benchmark) traceReceipt(ctx context.Context, receipt *types.Receipt, op return } - var gasUsage uint64 + gasUsage := txTrace.Gas + + if len(txTrace.StructLogs) > 0 { + gasUsage = gasLimit - txTrace.StructLogs[0].Gas + } + + opCodes["TRANSACTION"] = stats.OpCodeStats{ + Count: opCodes["TRANSACTION"].Count + 1, + Gas: opCodes["TRANSACTION"].Gas + gasUsage, + } + var gasRefund uint64 + for idx, log := range txTrace.StructLogs { opGas := log.GasCost @@ -75,7 +101,6 @@ func (r *Benchmark) traceReceipt(ctx context.Context, receipt *types.Receipt, op Gas: opCodes[log.Op].Gas + opGas, } - gasUsage += opGas gasRefund = log.Refund } @@ -83,11 +108,4 @@ func (r *Benchmark) traceReceipt(ctx context.Context, receipt *types.Receipt, op Count: opCodes["REFUND"].Count + 1, Gas: opCodes["REFUND"].Gas + gasRefund, } - - gasUsage -= gasRefund - - opCodes["TRANSACTION"] = stats.OpCodeStats{ - Count: opCodes["TRANSACTION"].Count + 1, - Gas: opCodes["TRANSACTION"].Gas + receipt.GasUsed - gasUsage, - } } From f7a57d123248cb64b2133c04a64be95306e0c068 Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Mon, 5 Aug 2024 09:00:29 -0700 Subject: [PATCH 7/8] Fix concurrency bug --- packages/replayor/benchmark.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/replayor/benchmark.go b/packages/replayor/benchmark.go index d440dc2..b1ae196 100644 --- a/packages/replayor/benchmark.go +++ b/packages/replayor/benchmark.go @@ -66,7 +66,7 @@ func (r *Benchmark) loadBlocks(ctx context.Context) { for i := uint64(0); i < concurrency; i++ { blockNum := blockStartRange + i - go func(index uint64) { + go func(index, blockNum uint64) { defer wg.Done() block, err := r.getBlockFromSourceNode(ctx, blockNum) @@ -81,7 +81,7 @@ func (r *Benchmark) loadBlocks(ctx context.Context) { m.Lock() results[index] = block m.Unlock() - }(i) + }(i, blockNum) } wg.Wait() From 5c54f228f2eb5fd7d1a4cc64dc917fa6cc95e4c7 Mon Sep 17 00:00:00 2001 From: Brian Bland Date: Mon, 5 Aug 2024 11:29:43 -0700 Subject: [PATCH 8/8] Fix stats concurrency issues --- packages/replayor/benchmark.go | 37 +++++++++++++++++++++++----------- 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/packages/replayor/benchmark.go b/packages/replayor/benchmark.go index b1ae196..b10f3a8 100644 --- a/packages/replayor/benchmark.go +++ b/packages/replayor/benchmark.go @@ -221,6 +221,7 @@ func (r *Benchmark) addBlock(ctx context.Context, currentBlock strategies.BlockC if r.remainingBlockCount == 0 { r.log.Info("finished processing blocks") + r.recordStats.Close() return } @@ -250,15 +251,23 @@ func (r *Benchmark) enrich(ctx context.Context, s *stats.BlockCreationStats) { } } -func (r *Benchmark) enrichAndRecordStats(ctx context.Context) { +func (r *Benchmark) enrichAndRecordStats(ctx context.Context) chan any { + var wg sync.WaitGroup + wg.Add(5) for i := 0; i < 5; i++ { go func() { + defer wg.Done() for { select { - case stats := <-r.recordStats.Out(): + case stats, ok := <-r.recordStats.Out(): + if !ok { + return + } r.enrich(ctx, &stats) + r.sm.Lock() r.s.RecordBlockStats(stats) + r.sm.Unlock() r.log.Debug("block stats", "BlockNumber", stats.BlockNumber, "BlockHash", stats.BlockHash, "TxnCount", stats.TxnCount, "TotalTime", stats.TotalTime, "FCUTime", stats.FCUTime, "GetTime", stats.GetTime, "NewTime", stats.NewTime, "FCUNoAttrsTime", stats.FCUNoAttrsTime, "Success", stats.Success, "GasUsed", stats.GasUsed, "GasLimit", stats.GasLimit) case <-ctx.Done(): @@ -267,6 +276,15 @@ func (r *Benchmark) enrichAndRecordStats(ctx context.Context) { } }() } + + doneChan := make(chan any) + + go func() { + wg.Wait() + close(doneChan) + }() + + return doneChan } func (r *Benchmark) submitBlocks(ctx context.Context) { @@ -311,7 +329,7 @@ func (r *Benchmark) Run(ctx context.Context) { go r.loadBlocks(ctx) go r.mapBlocks(ctx) go r.submitBlocks(ctx) - r.enrichAndRecordStats(ctx) + doneChan := r.enrichAndRecordStats(ctx) ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() @@ -322,6 +340,10 @@ func (r *Benchmark) Run(ctx context.Context) { for { select { + case <-doneChan: + r.log.Info("writing block stats") + r.s.Write(ctx) + return case <-ticker.C: currentBlock, err := r.clients.DestNode.BlockByNumber(ctx, nil) if err != nil { @@ -331,15 +353,6 @@ func (r *Benchmark) Run(ctx context.Context) { l.Info("replay progress", "blocks", currentBlock.NumberU64()-lastBlockNum, "incomingBlocks", len(r.incomingBlocks), "processBlocks", len(r.processBlocks), "currentBlock", currentBlock.NumberU64(), "statProgress", r.recordStats.Len(), "remaining", r.remainingBlockCount) lastBlockNum = currentBlock.NumberU64() - - r.sm.Lock() - rc := r.remainingBlockCount - r.sm.Unlock() - - if rc == 0 { - r.s.Write(ctx) - return - } case <-ctx.Done(): return }