Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proactive Syncer #1911

Merged
merged 61 commits into from
Mar 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
3b70670
chore: update license-header's year to 2025
Feb 3, 2025
2ca6b17
feat(x/dsmr): NewTypedClient
Feb 3, 2025
0ca9b2f
feat: block window syncer
Feb 3, 2025
b3bf68b
docs: remove unnecessary comment
Feb 3, 2025
961575d
lint: add no lint directive for random slice index generation turning…
Feb 3, 2025
feaa056
Merge branch 'main' of github.com:ava-labs/hypersdk into proactive-bl…
Feb 7, 2025
e66b699
feat: block window syncer
Feb 7, 2025
79ced7a
chore: lint & documentation
Feb 11, 2025
eea3d1e
test
Feb 12, 2025
85b6059
Merge branch 'main' into proactive-block-fetcher
Elvis339 Feb 12, 2025
a61fdfc
feat(pebble): Extended database.Database interface with ExtendedDatab…
Feb 17, 2025
914b030
feat(chainindex): optimize block pruning with range deletion
Feb 17, 2025
7134e40
refactor(typedclient): move typed client to internal
Feb 17, 2025
51e14c2
feat(chainindex): make chaindex use fallback instead of error
Feb 17, 2025
5658934
feat(syncer): remove blockwindowsyncer and move everything to interna…
Feb 17, 2025
c0a3eee
Merge remote-tracking branch 'origin/proactive-block-fetcher' into pr…
Feb 17, 2025
286f7a4
chore: revert change in vm.go
Feb 17, 2025
daf58c3
Merge branch 'main' into proactive-block-fetcher
Elvis339 Feb 17, 2025
de181e7
chore: lint
Feb 18, 2025
d8ffa94
feat(chainindex): revert changes to chainindex
Feb 19, 2025
0b829de
feat: simplify
Feb 19, 2025
6c3fcfe
feat: remove unused RangeDeleterDB interface
Feb 19, 2025
1fd5e86
feat: handle checkpoint's data race issues
Feb 19, 2025
8459ead
test(client): exit from the loop in after successful error catch
Feb 19, 2025
5470a95
lint
Feb 19, 2025
fa4a02e
ci: run clean before unit tests
Feb 19, 2025
8705f18
ci: remove run clean before unit tests
Feb 19, 2025
4885581
Refactor validity window tests and improve block fetch logic
Feb 20, 2025
61e8cad
chore: lint
Feb 20, 2025
7144a3c
feat: Fix error handling and ensure proper channel closing.
Feb 20, 2025
abbf4f2
feat: fix cancel order and improve block handling clarity
Feb 20, 2025
d123556
Merge branch 'main' of github.com:ava-labs/hypersdk into proactive-bl…
Elvis339 Feb 24, 2025
166197d
feat: enhance error messages in transaction workload tests
Elvis339 Feb 24, 2025
9f8d8fb
test: tmp add logs
Elvis339 Feb 24, 2025
3832397
lint
Elvis339 Feb 25, 2025
2329733
feat: simplify error handling in validity window sync fixing data race
Elvis339 Feb 26, 2025
7d392c7
chore: rename parseErr to err
Elvis339 Feb 26, 2025
c6266ed
Merge branch 'main' into proactive-block-fetcher
Elvis339 Feb 26, 2025
3a2627d
nits
Elvis339 Feb 27, 2025
4a09b9d
Merge remote-tracking branch 'origin/proactive-block-fetcher' into pr…
Elvis339 Feb 27, 2025
f5a3bd7
Merge branch 'main' into proactive-block-fetcher
Elvis339 Feb 28, 2025
62c9079
feat: use upstream peer sampler
Elvis339 Feb 28, 2025
ebc593a
chore: repalce fmt.Sprintf
Elvis339 Feb 28, 2025
a28f6d1
revert to eq check instead of errors.Is
Elvis339 Feb 28, 2025
8af8b4b
chore: bump avalanchego dep and go version to 1.23.6 required by aval…
Elvis339 Mar 3, 2025
85f96c4
Merge remote-tracking branch 'origin/proactive-block-fetcher' into pr…
Elvis339 Mar 3, 2025
f6af908
feat: replace async block fetching with sync impl
Elvis339 Mar 3, 2025
ac91c5f
revert changes to node_test.go
Elvis339 Mar 3, 2025
6ad4818
feat: add `AcceptHistorical` to validity window for backfilling histo…
Elvis339 Mar 3, 2025
fc7747e
address PR comments
Elvis339 Mar 5, 2025
2d9c9b3
docs
Elvis339 Mar 6, 2025
8a7cf18
test(syncer)
Elvis339 Mar 6, 2025
c00ab44
test(sync_client): add license header
Elvis339 Mar 6, 2025
596b56d
docs(syncer)
Elvis339 Mar 6, 2025
002ee78
Merge branch 'main' of github.com:ava-labs/hypersdk into proactive-bl…
Elvis339 Mar 10, 2025
96f9b63
lint
Elvis339 Mar 10, 2025
e20446f
test(sync_client)
Elvis339 Mar 11, 2025
88a84d6
Update internal/validitywindow/syncer.go
Elvis339 Mar 11, 2025
22b3434
address PR comments
Elvis339 Mar 11, 2025
bbb2926
Merge remote-tracking branch 'origin/proactive-block-fetcher' into pr…
Elvis339 Mar 11, 2025
d47c31d
test: avoid depending on std time
Elvis339 Mar 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions internal/pebble/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func NewDefaultConfig() Config {
}

// TODO: replace with AvalancheGo implementation (if still used for Vryx)
func New(file string, cfg Config, registerer prometheus.Registerer) (*Database, error) {
func New(dirName string, cfg Config, registerer prometheus.Registerer) (*Database, error) {
// These default settings are based on https://github.com/ethereum/go-ethereum/blob/master/ethdb/pebble/pebble.go
d := &Database{closing: make(chan struct{})}
opts := &pebble.Options{
Expand Down Expand Up @@ -96,7 +96,7 @@ func New(file string, cfg Config, registerer prometheus.Registerer) (*Database,
return nil, err
}
d.metrics = metrics
db, err := pebble.Open(file, opts)
db, err := pebble.Open(dirName, opts)
if err != nil {
return nil, err
}
Expand Down
85 changes: 85 additions & 0 deletions internal/typedclient/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright (C) 2025, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package typedclient

import (
"context"
"fmt"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/network/p2p"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/utils"
"github.com/ava-labs/avalanchego/utils/set"
)

type Marshaler[T any, U any, V any] interface {
MarshalRequest(T) ([]byte, error)
UnmarshalResponse([]byte) (U, error)
MarshalGossip(V) ([]byte, error)
}

type TypedClient[T any, U any, V any] struct {
client *p2p.Client
marshaler Marshaler[T, U, V]
}

func NewTypedClient[T any, U any, V any](client *p2p.Client, marshaler Marshaler[T, U, V]) *TypedClient[T, U, V] {
return &TypedClient[T, U, V]{
client: client,
marshaler: marshaler,
}
}

func (t *TypedClient[T, U, _]) AppRequest(
ctx context.Context,
nodeID ids.NodeID,
request T,
onResponse func(ctx context.Context, nodeID ids.NodeID, response U, err error),
) error {
onByteResponse := func(ctx context.Context, nodeID ids.NodeID, responseBytes []byte, err error) {
if err != nil {
onResponse(ctx, nodeID, utils.Zero[U](), err)
return
}

response, parseErr := t.marshaler.UnmarshalResponse(responseBytes)
if parseErr != nil {
onResponse(ctx, nodeID, utils.Zero[U](), parseErr)
return
}

onResponse(ctx, nodeID, response, err)
}

requestBytes, err := t.marshaler.MarshalRequest(request)
if err != nil {
return fmt.Errorf("failed to marshal request: %w", err)
}

return t.client.AppRequest(
ctx,
set.Of(nodeID),
requestBytes,
onByteResponse,
)
}

func (t *TypedClient[T, U, V]) AppGossip(
ctx context.Context,
gossip V,
) error {
gossipBytes, err := t.marshaler.MarshalGossip(gossip)
if err != nil {
return fmt.Errorf("failed to marshal gossip: %w", err)
}

return t.client.AppGossip(
ctx,
common.SendConfig{
Validators: 100,
},
gossipBytes,
)
}
60 changes: 60 additions & 0 deletions internal/typedclient/sync_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (C) 2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package typedclient

import (
"context"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/network/p2p"
"github.com/ava-labs/avalanchego/utils"
)

// SyncTypedClient provides a synchronous wrapper around TypedClient
type SyncTypedClient[T any, U any, V any] struct {
client *TypedClient[T, U, V]
}

// NewSyncTypedClient creates a new synchronous client
func NewSyncTypedClient[T any, U any, V any](client *p2p.Client, marshaler Marshaler[T, U, V]) *SyncTypedClient[T, U, V] {
return &SyncTypedClient[T, U, V]{
client: NewTypedClient(client, marshaler),
}
}

// SyncAppRequest sends a request and waits for the response synchronously
func (s *SyncTypedClient[T, U, V]) SyncAppRequest(
ctx context.Context,
nodeID ids.NodeID,
request T,
) (U, error) {
var (
response U
respErr error
done = make(chan struct{})
)

// We are guaranteed to eventually receive a response
onResponse := func(_ context.Context, _ ids.NodeID, resp U, err error) {
select {
case <-done:
return
default:
response = resp
respErr = err
close(done)
}
}

if err := s.client.AppRequest(ctx, nodeID, request, onResponse); err != nil {
return utils.Zero[U](), err
}

select {
case <-done:
return response, respErr
case <-ctx.Done():
return utils.Zero[U](), ctx.Err()
}
}
166 changes: 166 additions & 0 deletions internal/typedclient/sync_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
// Copyright (C) 2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package typedclient

import (
"context"
"testing"
"time"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/network/p2p/p2ptest"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/utils"
"github.com/stretchr/testify/require"
)

var _ Marshaler[testRequest, testResponse, []byte] = (*testMarshaler)(nil)

func TestSyncAppRequest(t *testing.T) {
tests := []struct {
name string
errResponse bool
networkError bool
expectedErrContains string
responseDelay time.Duration
contextTimeout time.Duration
}{
{
name: "successful request",
contextTimeout: 2 * time.Second,
},
{
name: "context timeout",
responseDelay: 1500 * time.Millisecond,
contextTimeout: time.Second,
expectedErrContains: "context deadline exceeded",
},
{
name: "error response",
contextTimeout: 2 * time.Second,
expectedErrContains: "simulated error",
errResponse: true,
},
}

// Enforce run in parallel
t.Parallel()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := require.New(t)
ctx, cancel := context.WithTimeout(context.Background(), tt.contextTimeout)
defer cancel()

handlerDone := make(chan struct{})
handler := &testHandler{
simulateError: tt.errResponse,
responseDelay: tt.responseDelay,
doneCh: handlerDone,
response: []byte(tt.name),
}

handlerID := ids.GenerateTestNodeID()

marshaler := testMarshaler{}
client := p2ptest.NewSelfClient(t, ctx, handlerID, handler)
syncClient := NewSyncTypedClient[testRequest, testResponse, []byte](client, &marshaler)

response, err := withContextEnforcement(ctx, tt.expectedErrContains, func() (testResponse, error) {
return syncClient.SyncAppRequest(ctx, handlerID, testRequest{message: "What is the current test name?"})
})

if len(tt.expectedErrContains) > 0 {
r.Error(err)
r.ErrorContains(err, tt.expectedErrContains)
r.Equal(utils.Zero[testResponse](), response)
} else {
r.NoError(err)
r.Equal(tt.name, response.reply)
}
})
}
}

// testHandler simulates different response behaviors
type testHandler struct {
simulateError bool
responseDelay time.Duration
response []byte
doneCh chan struct{}
}

func (t *testHandler) Close() error {
if t.doneCh != nil {
close(t.doneCh)
}
return nil
}

func (t *testHandler) AppRequest(
ctx context.Context,
_ ids.NodeID,
_ time.Time,
_ []byte,
) ([]byte, *common.AppError) {
if t.simulateError {
return nil, &common.AppError{Code: 1, Message: "simulated error"}
}

if t.responseDelay > 0 {
select {
case <-time.After(t.responseDelay):
// Delay completed
case <-ctx.Done():
// Context was canceled before delay completed
return nil, &common.AppError{
Code: 2,
Message: ctx.Err().Error(),
}
}
}

return t.response, nil
}

// AppGossip is required by the interface but not used in this test
func (*testHandler) AppGossip(
_ context.Context,
_ ids.NodeID,
_ []byte,
) {
}

type testRequest struct {
message string
}

type testResponse struct {
reply string
}

type testMarshaler struct{}

func (testMarshaler) MarshalRequest(request testRequest) ([]byte, error) {
return []byte(request.message), nil
}

func (testMarshaler) UnmarshalResponse(bytes []byte) (testResponse, error) {
return testResponse{string(bytes)}, nil
}

func (testMarshaler) MarshalGossip(bytes []byte) ([]byte, error) {
return bytes, nil
}

// Ensure the context is cancelled before checking results
func withContextEnforcement(ctx context.Context, expectedErr string, fn func() (testResponse, error)) (testResponse, error) {
// If we expect a context deadline error, just wait for the context to be cancelled
if expectedErr == "context deadline exceeded" {
<-ctx.Done()
return utils.Zero[testResponse](), ctx.Err()
}

// For other cases, run the function normally
return fn()
}
Loading
Loading