Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix various concurrency and tracing issues #6

Merged
merged 8 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 29 additions & 65 deletions packages/replayor/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (r *Benchmark) loadBlocks(ctx context.Context) {
for i := uint64(0); i < concurrency; i++ {
blockNum := blockStartRange + i

go func(index uint64) {
go func(index, blockNum uint64) {
defer wg.Done()

block, err := r.getBlockFromSourceNode(ctx, blockNum)
Expand All @@ -81,7 +81,7 @@ func (r *Benchmark) loadBlocks(ctx context.Context) {
m.Lock()
results[index] = block
m.Unlock()
}(i)
}(i, blockNum)
}

wg.Wait()
Expand Down Expand Up @@ -221,6 +221,7 @@ func (r *Benchmark) addBlock(ctx context.Context, currentBlock strategies.BlockC

if r.remainingBlockCount == 0 {
r.log.Info("finished processing blocks")
r.recordStats.Close()
return
}

Expand All @@ -240,74 +241,33 @@ func (r *Benchmark) enrich(ctx context.Context, s *stats.BlockCreationStats) {
for _, receipt := range receipts {
if receipt.Status == types.ReceiptStatusSuccessful {
success += 1
s.GasUsed += receipt.GasUsed
}
}
s.Success = float64(success) / float64(len(receipts))

if r.benchmarkOpcodes {
s.OpCodes = make(map[string]stats.OpCodeStats)

for _, receipt := range receipts {
txTrace, err := retry.Do(ctx, 10, retry.Exponential(), func() (*TxTrace, error) {
var txTrace TxTrace
err := r.clients.DestNode.Client().Call(&txTrace, "debug_traceTransaction", receipt.TxHash, map[string]string{})
if err != nil {
return nil, err
}
return &txTrace, nil
})
if err != nil {
r.log.Warn("unable to load tx trace", "err", err)
}

var prevOpCode string
prevGas := txTrace.Gas
var gasUsage uint64
callStack := make([]string, 0, 1024)
for _, log := range txTrace.StructLogs {
prevDepth := uint64(len(callStack))
if log.Depth > prevDepth {
// track the caller opcode
callStack = append(callStack, prevOpCode)
} else if log.Depth < prevDepth {
// account for refunded call stack gas
caller := callStack[len(callStack)-1]
callStack = callStack[:len(callStack)-1]
s.OpCodes[caller] = stats.OpCodeStats{
Count: s.OpCodes[caller].Count + 1,
Gas: s.OpCodes[caller].Gas + log.Gas - prevGas,
}
gasUsage += log.Gas - prevGas
} else {
s.OpCodes[prevOpCode] = stats.OpCodeStats{
Count: s.OpCodes[prevOpCode].Count + 1,
Gas: s.OpCodes[prevOpCode].Gas + prevGas - log.Gas,
}
gasUsage += prevGas - log.Gas
}
prevOpCode = log.Op
prevGas = log.Gas
}

s.OpCodes["TRANSACTION"] = stats.OpCodeStats{
Count: s.OpCodes["TRANSACTION"].Count + 1,
Gas: s.OpCodes["TRANSACTION"].Gas + receipt.GasUsed - gasUsage,
}

s.GasUsed += receipt.GasUsed
}
r.computeTraceStats(ctx, s, receipts)
}
}

func (r *Benchmark) enrichAndRecordStats(ctx context.Context) {
func (r *Benchmark) enrichAndRecordStats(ctx context.Context) chan any {
var wg sync.WaitGroup
wg.Add(5)
for i := 0; i < 5; i++ {
go func() {
defer wg.Done()
for {
select {
case stats := <-r.recordStats.Out():
case stats, ok := <-r.recordStats.Out():
if !ok {
return
}
r.enrich(ctx, &stats)

r.sm.Lock()
r.s.RecordBlockStats(stats)
r.sm.Unlock()

r.log.Debug("block stats", "BlockNumber", stats.BlockNumber, "BlockHash", stats.BlockHash, "TxnCount", stats.TxnCount, "TotalTime", stats.TotalTime, "FCUTime", stats.FCUTime, "GetTime", stats.GetTime, "NewTime", stats.NewTime, "FCUNoAttrsTime", stats.FCUNoAttrsTime, "Success", stats.Success, "GasUsed", stats.GasUsed, "GasLimit", stats.GasLimit)
case <-ctx.Done():
Expand All @@ -316,6 +276,15 @@ func (r *Benchmark) enrichAndRecordStats(ctx context.Context) {
}
}()
}

doneChan := make(chan any)

go func() {
wg.Wait()
close(doneChan)
}()

return doneChan
}

func (r *Benchmark) submitBlocks(ctx context.Context) {
Expand Down Expand Up @@ -360,7 +329,7 @@ func (r *Benchmark) Run(ctx context.Context) {
go r.loadBlocks(ctx)
go r.mapBlocks(ctx)
go r.submitBlocks(ctx)
r.enrichAndRecordStats(ctx)
doneChan := r.enrichAndRecordStats(ctx)

ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
Expand All @@ -371,6 +340,10 @@ func (r *Benchmark) Run(ctx context.Context) {

for {
select {
case <-doneChan:
r.log.Info("writing block stats")
r.s.Write(ctx)
return
case <-ticker.C:
currentBlock, err := r.clients.DestNode.BlockByNumber(ctx, nil)
if err != nil {
Expand All @@ -380,15 +353,6 @@ func (r *Benchmark) Run(ctx context.Context) {
l.Info("replay progress", "blocks", currentBlock.NumberU64()-lastBlockNum, "incomingBlocks", len(r.incomingBlocks), "processBlocks", len(r.processBlocks), "currentBlock", currentBlock.NumberU64(), "statProgress", r.recordStats.Len(), "remaining", r.remainingBlockCount)

lastBlockNum = currentBlock.NumberU64()

r.sm.Lock()
rc := r.remainingBlockCount
r.sm.Unlock()

if rc == 0 {
r.s.Write(ctx)
return
}
case <-ctx.Done():
return
}
Expand Down
19 changes: 19 additions & 0 deletions packages/replayor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ import (
"context"
"errors"
"sync/atomic"
"time"

"github.com/danyalprout/replayor/packages/clients"
"github.com/danyalprout/replayor/packages/config"
"github.com/danyalprout/replayor/packages/stats"
"github.com/danyalprout/replayor/packages/strategies"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/retry"
"github.com/ethereum/go-ethereum/log"
)

Expand Down Expand Up @@ -41,6 +44,22 @@ func (r *Service) Start(ctx context.Context) error {
panic(err)
}

retry.Do(ctx, 720, retry.Fixed(10*time.Second), func() (bool, error) {
result, err := r.clients.EngineApi.ForkchoiceUpdate(ctx, &eth.ForkchoiceState{
HeadBlockHash: currentBlock.Hash(),
SafeBlockHash: currentBlock.Hash(),
FinalizedBlockHash: currentBlock.Hash(),
}, nil)
if err != nil {
r.log.Info("waiting for engine API to stop syncing", "err", err)
return false, err
} else if result.PayloadStatus.Status != eth.ExecutionValid {
r.log.Info("waiting for execution API to stop syncing", "status", result.PayloadStatus.Status)
return false, errors.New("syncing")
}
return true, nil
})

if r.cfg.BenchmarkStartBlock != 0 {
if currentBlock.NumberU64() < r.cfg.BenchmarkStartBlock {
walkUpToBlock := NewBenchmark(
Expand Down
116 changes: 107 additions & 9 deletions packages/replayor/trace.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,111 @@
package replayor

import (
"context"

"github.com/danyalprout/replayor/packages/stats"
"github.com/ethereum-optimism/optimism/op-service/retry"
"github.com/ethereum/go-ethereum/core/types"
)

type TxTrace struct {
Gas uint64 `json:"gas"`
Failed bool `json:"failed"`
StructLogs []struct {
Op string `json:"op"`
Gas uint64 `json:"gas"`
GasCost uint64 `json:"gasCost"`
Refund uint64 `json:"refund"`
Depth uint64 `json:"depth"`
} `json:"structLogs"`
Gas uint64 `json:"gas"`
Failed bool `json:"failed"`
StructLogs []StructLog `json:"structLogs"`
}

type StructLog struct {
Op string `json:"op"`
Gas uint64 `json:"gas"`
GasCost uint64 `json:"gasCost"`
Refund uint64 `json:"refund"`
Depth uint64 `json:"depth"`
}

var tracerOptions = map[string]any{
"disableStack": true,
"disableStorage": true,
}

func (r *Benchmark) computeTraceStats(ctx context.Context, s *stats.BlockCreationStats, receipts []*types.Receipt) {
s.OpCodes = make(map[string]stats.OpCodeStats)

for _, receipt := range receipts {
r.traceReceipt(ctx, receipt, s.OpCodes)
}
}

func (r *Benchmark) traceReceipt(ctx context.Context, receipt *types.Receipt, opCodes map[string]stats.OpCodeStats) {
tx, err := retry.Do(ctx, 10, retry.Exponential(), func() (*types.Transaction, error) {
tx, _, err := r.clients.DestNode.TransactionByHash(ctx, receipt.TxHash)
return tx, err
})
if err != nil {
r.log.Warn("unable to load tx", "err", err)
opCodes["UNKNOWN"] = stats.OpCodeStats{
Count: opCodes["UNKNOWN"].Count + 1,
Gas: opCodes["UNKNOWN"].Gas + receipt.GasUsed,
}
return
}

gasLimit := tx.Gas()

txTrace, err := retry.Do(ctx, 10, retry.Exponential(), func() (*TxTrace, error) {
var txTrace TxTrace
err := r.clients.DestNode.Client().Call(&txTrace, "debug_traceTransaction", receipt.TxHash, tracerOptions)
if err != nil {
return nil, err
}
return &txTrace, nil
})
if err != nil {
r.log.Warn("unable to load tx trace", "err", err)
opCodes["UNKNOWN"] = stats.OpCodeStats{
Count: opCodes["UNKNOWN"].Count + 1,
Gas: opCodes["UNKNOWN"].Gas + receipt.GasUsed,
}
return
}

gasUsage := txTrace.Gas

if len(txTrace.StructLogs) > 0 {
gasUsage = gasLimit - txTrace.StructLogs[0].Gas
}

opCodes["TRANSACTION"] = stats.OpCodeStats{
Count: opCodes["TRANSACTION"].Count + 1,
Gas: opCodes["TRANSACTION"].Gas + gasUsage,
}

var gasRefund uint64

for idx, log := range txTrace.StructLogs {
opGas := log.GasCost

var nextLog *StructLog
if idx < len(txTrace.StructLogs)-1 {
nextLog = &txTrace.StructLogs[idx+1]
}
if nextLog != nil {
if log.Depth < nextLog.Depth {
opGas = log.GasCost - nextLog.Gas
} else if log.Depth == nextLog.Depth {
opGas = log.Gas - nextLog.Gas
}
}

opCodes[log.Op] = stats.OpCodeStats{
Count: opCodes[log.Op].Count + 1,
Gas: opCodes[log.Op].Gas + opGas,
}

gasRefund = log.Refund
}

opCodes["REFUND"] = stats.OpCodeStats{
Count: opCodes["REFUND"].Count + 1,
Gas: opCodes["REFUND"].Gas + gasRefund,
}
}