Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ExecutionClient returns PromiseInterface #2856

Open
wants to merge 40 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
ac5e94c
ExecutionClient.ResultAtPos returns PromiseInterface
diegoximenes Dec 24, 2024
d61fd79
ExecutionClient.DigestMessage returns PromiseInterface
diegoximenes Dec 24, 2024
b3bc9e5
ExecutionClient.Reorg returns PromiseInterface
diegoximenes Dec 24, 2024
6d5daff
ExecutionClient.HeadMessageNumber returns PromiseInterface
diegoximenes Dec 24, 2024
0968d34
ExecutionClient.HeadMessageNumberSync returns PromiseInterface
diegoximenes Dec 24, 2024
1758a48
Fixes context usage when awaiting promises returned by ExecutionClient
diegoximenes Dec 24, 2024
db0d8b9
Fixes reorg_requesequencing_test
diegoximenes Dec 24, 2024
05a1165
Fixes context used for execution client in transaction_streamer
diegoximenes Dec 24, 2024
095bec6
Moves MarkFeedStart to ExecutionClient
diegoximenes Dec 26, 2024
cdbbd6f
NonFullExecutionClient
diegoximenes Dec 26, 2024
e853505
Renames NonFullExecutionClient to ExecutionClientImpl
diegoximenes Dec 26, 2024
349f63b
Fix lint issue
diegoximenes Dec 26, 2024
27f3b5c
Removes HeadMessageNumberSync from ExecutionClient
diegoximenes Dec 30, 2024
0939197
Uses ExecutionClient instead of ExecutionSequencer in TransactionStre…
diegoximenes Dec 30, 2024
0fe2544
Moves Maintenance from FullExecutionClient to ExecutionClient
diegoximenes Dec 30, 2024
2e0009f
Uses ExecutionClient instead of FullExecutionClient in MaintenanceRunner
diegoximenes Dec 30, 2024
40bf8d2
ExecutionClient.Maintenance returns promise
diegoximenes Dec 30, 2024
244fcdf
Refactor CreateNode
diegoximenes Dec 31, 2024
ae83358
Renames CreateNode to CreateNodeFullExecutionClient
diegoximenes Dec 31, 2024
d2be58a
Uses more specific execution interfaces in createNodeImpl
diegoximenes Jan 1, 2025
2f04936
ExecutionBatchPoster
diegoximenes Jan 1, 2025
84b3997
Get rid of FullExecutionClient
diegoximenes Jan 1, 2025
94fe9b1
Checks execution clients in CreateNodeFullExecutionClient
diegoximenes Jan 2, 2025
d5bff33
Moves execution clients nil check to get funcs
diegoximenes Jan 2, 2025
9b7609e
Pass config to get functions
diegoximenes Jan 2, 2025
c0893a6
Split arguments of get functions in multiple lines
diegoximenes Jan 2, 2025
188896a
Get rid of ExecutionClientImpl
diegoximenes Jan 2, 2025
25a962c
Fixes inbox_test
diegoximenes Jan 2, 2025
710d138
Fixes Node.Start nil reference
diegoximenes Jan 2, 2025
3bca9c7
ExecutionClient.Start and ExecutionClient.StopAndWait return promises
diegoximenes Jan 3, 2025
dbf36cd
Fixes challengetests
diegoximenes Jan 3, 2025
8aa0c74
Fixes snap sync test procedure when creating arb node
diegoximenes Jan 3, 2025
54973bb
Avoids using context.Background() in transaction streamer
diegoximenes Jan 15, 2025
0cfed49
Fixes maintenance test
diegoximenes Feb 18, 2025
e3e6e65
ExecutionClient.MessageIndexToBlockNumber returns Promise
diegoximenes Feb 18, 2025
8ae04c7
ExecutionClient.BlockNumberToMessageIndex returns Promise
diegoximenes Feb 18, 2025
5b6185f
ExecutionClient.SetFinalityData returns Promise
diegoximenes Feb 20, 2025
d9fb740
Adds BlockMetadataFetcher to DefaultConfig back
diegoximenes Feb 20, 2025
c565f83
Simplifies MarkFeedStart
diegoximenes Feb 20, 2025
b3afdeb
Removes unused ExecutionNode.HeadMessageNumberSync
diegoximenes Feb 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ type BatchPoster struct {
l1Reader *headerreader.HeaderReader
inbox *InboxTracker
streamer *TransactionStreamer
arbOSVersionGetter execution.FullExecutionClient
arbOSVersionGetter execution.ExecutionBatchPoster
config BatchPosterConfigFetcher
seqInbox *bridgegen.SequencerInbox
syncMonitor *SyncMonitor
Expand Down Expand Up @@ -307,7 +307,7 @@ type BatchPosterOpts struct {
L1Reader *headerreader.HeaderReader
Inbox *InboxTracker
Streamer *TransactionStreamer
VersionGetter execution.FullExecutionClient
VersionGetter execution.ExecutionBatchPoster
SyncMonitor *SyncMonitor
Config BatchPosterConfigFetcher
DeployInfo *chaininfo.RollupAddresses
Expand Down
23 changes: 17 additions & 6 deletions arbnode/blockmetadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func NewBlockMetadataFetcher(ctx context.Context, c BlockMetadataFetcherConfig,
var trackBlockMetadataFrom arbutil.MessageIndex
var err error
if startPos != 0 {
trackBlockMetadataFrom, err = exec.BlockNumberToMessageIndex(startPos)
trackBlockMetadataFrom, err = exec.BlockNumberToMessageIndex(startPos).Await(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -83,11 +83,11 @@ func (b *BlockMetadataFetcher) fetch(ctx context.Context, fromBlock, toBlock uin
return result, nil
}

func (b *BlockMetadataFetcher) persistBlockMetadata(query []uint64, result []gethexec.NumberAndBlockMetadata) error {
func (b *BlockMetadataFetcher) persistBlockMetadata(ctx context.Context, query []uint64, result []gethexec.NumberAndBlockMetadata) error {
batch := b.db.NewBatch()
queryMap := util.ArrayToSet(query)
for _, elem := range result {
pos, err := b.exec.BlockNumberToMessageIndex(elem.BlockNumber)
pos, err := b.exec.BlockNumberToMessageIndex(elem.BlockNumber).Await(ctx)
if err != nil {
return err
}
Expand All @@ -112,16 +112,27 @@ func (b *BlockMetadataFetcher) persistBlockMetadata(query []uint64, result []get

func (b *BlockMetadataFetcher) Update(ctx context.Context) time.Duration {
handleQuery := func(query []uint64) bool {
fromBlock, err := b.exec.MessageIndexToBlockNumber(arbutil.MessageIndex(query[0])).Await(ctx)
if err != nil {
log.Error("Error getting fromBlock", "err", err)
return false
}
toBlock, err := b.exec.MessageIndexToBlockNumber(arbutil.MessageIndex(query[len(query)-1])).Await(ctx)
if err != nil {
log.Error("Error getting toBlock", "err", err)
return false
}

result, err := b.fetch(
ctx,
b.exec.MessageIndexToBlockNumber(arbutil.MessageIndex(query[0])),
b.exec.MessageIndexToBlockNumber(arbutil.MessageIndex(query[len(query)-1])),
fromBlock,
toBlock,
)
if err != nil {
log.Error("Error getting result from bulk blockMetadata API", "err", err)
return false
}
if err = b.persistBlockMetadata(query, result); err != nil {
if err = b.persistBlockMetadata(ctx, query, result); err != nil {
log.Error("Error committing result from bulk blockMetadata API to ArbDB", "err", err)
return false
}
Expand Down
2 changes: 1 addition & 1 deletion arbnode/consensus_execution_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (c *ConsensusExecutionSyncer) pushFinalityDataFromConsensusToExecution(ctx
ValidatedMsgCount: &validatedMsgCount,
}

err = c.execClient.SetFinalityData(ctx, finalityData)
_, err = c.execClient.SetFinalityData(ctx, finalityData).Await(ctx)
if err != nil {
log.Error("Error pushing finality data from consensus to execution", "err", err)
} else {
Expand Down
4 changes: 2 additions & 2 deletions arbnode/delayed_seq_reorg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestSequencerReorgFromDelayed(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

exec, streamer, db, _ := NewTransactionStreamerForTest(t, common.Address{})
exec, streamer, db, _ := NewTransactionStreamerForTest(t, ctx, common.Address{})
tracker, err := NewInboxTracker(db, streamer, nil, DefaultSnapSyncConfig)
Require(t, err)

Expand Down Expand Up @@ -219,7 +219,7 @@ func TestSequencerReorgFromLastDelayedMsg(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

exec, streamer, db, _ := NewTransactionStreamerForTest(t, common.Address{})
exec, streamer, db, _ := NewTransactionStreamerForTest(t, ctx, common.Address{})
tracker, err := NewInboxTracker(db, streamer, nil, DefaultSnapSyncConfig)
Require(t, err)

Expand Down
82 changes: 70 additions & 12 deletions arbnode/inbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,32 +23,89 @@ import (
"github.com/offchainlabs/nitro/arbos/l2pricing"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/cmd/chaininfo"
"github.com/offchainlabs/nitro/execution"
"github.com/offchainlabs/nitro/execution/gethexec"
"github.com/offchainlabs/nitro/statetransfer"
"github.com/offchainlabs/nitro/util/arbmath"
"github.com/offchainlabs/nitro/util/containers"
"github.com/offchainlabs/nitro/util/testhelpers"
"github.com/offchainlabs/nitro/util/testhelpers/env"
)

type execClientWrapper struct {
*gethexec.ExecutionEngine
t *testing.T
ExecutionEngine *gethexec.ExecutionEngine
t *testing.T
}

func (w *execClientWrapper) Pause() { w.t.Error("not supported") }
func (w *execClientWrapper) Activate() { w.t.Error("not supported") }
func (w *execClientWrapper) Pause() { w.t.Error("not supported") }

func (w *execClientWrapper) Activate() { w.t.Error("not supported") }

func (w *execClientWrapper) ForwardTo(url string) error { w.t.Error("not supported"); return nil }
func (w *execClientWrapper) Synced() bool { w.t.Error("not supported"); return false }

func (w *execClientWrapper) SequenceDelayedMessage(message *arbostypes.L1IncomingMessage, delayedSeqNum uint64) error {
return w.ExecutionEngine.SequenceDelayedMessage(message, delayedSeqNum)
}

func (w *execClientWrapper) NextDelayedMessageNumber() (uint64, error) {
return w.ExecutionEngine.NextDelayedMessageNumber()
}

func (w *execClientWrapper) MarkFeedStart(to arbutil.MessageIndex) containers.PromiseInterface[struct{}] {
markFeedStartWithReturn := func(to arbutil.MessageIndex) (struct{}, error) {
w.ExecutionEngine.MarkFeedStart(to)
return struct{}{}, nil
}
return containers.NewReadyPromise(markFeedStartWithReturn(to))
}

func (w *execClientWrapper) Maintenance() containers.PromiseInterface[struct{}] {
return containers.NewReadyPromise(struct{}{}, nil)
}

func (w *execClientWrapper) Synced() bool { w.t.Error("not supported"); return false }

func (w *execClientWrapper) FullSyncProgressMap() map[string]interface{} {
w.t.Error("not supported")
return nil
}
func (w *execClientWrapper) SetFinalityData(ctx context.Context, finalityData *arbutil.FinalityData) error {
w.t.Error("not supported")
return nil
func (w *execClientWrapper) SetFinalityData(ctx context.Context, finalityData *arbutil.FinalityData) containers.PromiseInterface[struct{}] {
return containers.NewReadyPromise(struct{}{}, nil)
}

func (w *execClientWrapper) DigestMessage(num arbutil.MessageIndex, msg *arbostypes.MessageWithMetadata, msgForPrefetch *arbostypes.MessageWithMetadata) containers.PromiseInterface[*execution.MessageResult] {
return containers.NewReadyPromise(w.ExecutionEngine.DigestMessage(num, msg, msgForPrefetch))
}

func NewTransactionStreamerForTest(t *testing.T, ownerAddress common.Address) (*gethexec.ExecutionEngine, *TransactionStreamer, ethdb.Database, *core.BlockChain) {
func (w *execClientWrapper) Reorg(count arbutil.MessageIndex, newMessages []arbostypes.MessageWithMetadataAndBlockInfo, oldMessages []*arbostypes.MessageWithMetadata) containers.PromiseInterface[[]*execution.MessageResult] {
return containers.NewReadyPromise(w.ExecutionEngine.Reorg(count, newMessages, oldMessages))
}

func (w *execClientWrapper) HeadMessageNumber() containers.PromiseInterface[arbutil.MessageIndex] {
return containers.NewReadyPromise(w.ExecutionEngine.HeadMessageNumber())
}

func (w *execClientWrapper) ResultAtPos(pos arbutil.MessageIndex) containers.PromiseInterface[*execution.MessageResult] {
return containers.NewReadyPromise(w.ExecutionEngine.ResultAtPos(pos))
}

func (w *execClientWrapper) Start(ctx context.Context) containers.PromiseInterface[struct{}] {
return containers.NewReadyPromise(struct{}{}, nil)
}

func (w *execClientWrapper) MessageIndexToBlockNumber(messageNum arbutil.MessageIndex) containers.PromiseInterface[uint64] {
return containers.NewReadyPromise(w.ExecutionEngine.MessageIndexToBlockNumber(messageNum), nil)
}

func (w *execClientWrapper) BlockNumberToMessageIndex(blockNum uint64) containers.PromiseInterface[arbutil.MessageIndex] {
return containers.NewReadyPromise(w.ExecutionEngine.BlockNumberToMessageIndex(blockNum))
}

func (w *execClientWrapper) StopAndWait() containers.PromiseInterface[struct{}] {
return containers.NewReadyPromise(struct{}{}, nil)
}

func NewTransactionStreamerForTest(t *testing.T, ctx context.Context, ownerAddress common.Address) (*gethexec.ExecutionEngine, *TransactionStreamer, ethdb.Database, *core.BlockChain) {
chainConfig := chaininfo.ArbitrumDevTestChainConfig()

initData := statetransfer.ArbosInitializationInfo{
Expand Down Expand Up @@ -82,7 +139,7 @@ func NewTransactionStreamerForTest(t *testing.T, ownerAddress common.Address) (*
Fail(t, err)
}
execSeq := &execClientWrapper{execEngine, t}
inbox, err := NewTransactionStreamer(arbDb, bc.Config(), execSeq, nil, make(chan error, 1), transactionStreamerConfigFetcher, &DefaultSnapSyncConfig)
inbox, err := NewTransactionStreamer(ctx, arbDb, bc.Config(), execSeq, nil, make(chan error, 1), transactionStreamerConfigFetcher, &DefaultSnapSyncConfig)
if err != nil {
Fail(t, err)
}
Expand All @@ -106,10 +163,11 @@ type blockTestState struct {
func TestTransactionStreamer(t *testing.T) {
ownerAddress := common.HexToAddress("0x1111111111111111111111111111111111111111")

exec, inbox, _, bc := NewTransactionStreamerForTest(t, ownerAddress)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

exec, inbox, _, bc := NewTransactionStreamerForTest(t, ctx, ownerAddress)

err := inbox.Start(ctx)
Require(t, err)
exec.Start(ctx)
Expand Down
7 changes: 4 additions & 3 deletions arbnode/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
type MaintenanceRunner struct {
stopwaiter.StopWaiter

exec execution.FullExecutionClient
exec execution.ExecutionClient
config MaintenanceConfigFetcher
seqCoordinator *SeqCoordinator
dbs []ethdb.Database
Expand Down Expand Up @@ -92,7 +92,7 @@ var DefaultMaintenanceConfig = MaintenanceConfig{

type MaintenanceConfigFetcher func() *MaintenanceConfig

func NewMaintenanceRunner(config MaintenanceConfigFetcher, seqCoordinator *SeqCoordinator, dbs []ethdb.Database, exec execution.FullExecutionClient) (*MaintenanceRunner, error) {
func NewMaintenanceRunner(config MaintenanceConfigFetcher, seqCoordinator *SeqCoordinator, dbs []ethdb.Database, exec execution.ExecutionClient) (*MaintenanceRunner, error) {
cfg := config()
if err := cfg.Validate(); err != nil {
return nil, fmt.Errorf("validating config: %w", err)
Expand Down Expand Up @@ -258,7 +258,8 @@ func (mr *MaintenanceRunner) runMaintenance() error {
}
expected++
go func() {
results <- mr.exec.Maintenance()
_, res := mr.exec.Maintenance().Await(mr.GetContext())
results <- res
}()
for i := 0; i < expected; i++ {
subErr := <-results
Expand Down
Loading
Loading