diff --git a/api/indexer/indexer.go b/api/indexer/indexer.go index ea2fd75a25..4e51516094 100644 --- a/api/indexer/indexer.go +++ b/api/indexer/indexer.go @@ -17,8 +17,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/ava-labs/hypersdk/chain" - "github.com/ava-labs/hypersdk/codec" - "github.com/ava-labs/hypersdk/consts" "github.com/ava-labs/hypersdk/event" "github.com/ava-labs/hypersdk/fees" "github.com/ava-labs/hypersdk/internal/cache" @@ -195,26 +193,17 @@ func (*Indexer) storeTransaction( outputs [][]byte, errorStr string, ) error { - outputLength := consts.ByteLen // Single byte containing number of outputs - for _, output := range outputs { - outputLength += consts.Uint32Len + len(output) + storageTx := storageTx{ + Timestamp: timestamp, + Success: success, + Units: units.Bytes(), + Fee: fee, + Outputs: outputs, + Error: errorStr, } - txResultLength := consts.Uint64Len + consts.BoolLen + fees.DimensionsLen + consts.Uint64Len + outputLength - - writer := codec.NewWriter(txResultLength, consts.NetworkSizeLimit) - writer.PackUint64(uint64(timestamp)) - writer.PackBool(success) - writer.PackFixedBytes(units.Bytes()) - writer.PackUint64(fee) - writer.PackByte(byte(len(outputs))) - for _, output := range outputs { - writer.PackBytes(output) - } - writer.PackString(errorStr) - if err := writer.Err(); err != nil { - return err - } - return batch.Put(txID[:], writer.Bytes()) + storageTxBytes := storageTx.MarshalCanoto() + + return batch.Put(txID[:], storageTxBytes) } func (i *Indexer) GetTransaction(txID ids.ID) (bool, int64, bool, fees.Dimensions, uint64, [][]byte, string, error) { @@ -225,26 +214,17 @@ func (i *Indexer) GetTransaction(txID ids.ID) (bool, int64, bool, fees.Dimension if err != nil { return false, 0, false, fees.Dimensions{}, 0, nil, "", err } - reader := codec.NewReader(v, consts.NetworkSizeLimit) - timestamp := reader.UnpackUint64(true) - success := reader.UnpackBool() - dimensionsBytes := make([]byte, fees.DimensionsLen) - reader.UnpackFixedBytes(fees.DimensionsLen, &dimensionsBytes) - fee := reader.UnpackUint64(true) - numOutputs := int(reader.UnpackByte()) - outputs := make([][]byte, numOutputs) - for i := range outputs { - outputs[i] = reader.UnpackLimitedBytes(consts.NetworkSizeLimit) - } - errorStr := reader.UnpackString(false) - if err := reader.Err(); err != nil { - return false, 0, false, fees.Dimensions{}, 0, nil, "", err + + storageTx := storageTx{} + if err := storageTx.UnmarshalCanoto(v); err != nil { + return false, 0, false, fees.Dimensions{}, 0, nil, "", fmt.Errorf("failed to unmarshal storage tx %s: %w", txID, err) } - dimensions, err := fees.UnpackDimensions(dimensionsBytes) + + unpackedUnits, err := fees.UnpackDimensions(storageTx.Units) if err != nil { - return false, 0, false, fees.Dimensions{}, 0, nil, "", err + return false, 0, false, fees.Dimensions{}, 0, nil, "", fmt.Errorf("failed to unpack units for storage tx %s: %w", txID, err) } - return true, int64(timestamp), success, dimensions, fee, outputs, errorStr, nil + return true, storageTx.Timestamp, storageTx.Success, unpackedUnits, storageTx.Fee, storageTx.Outputs, storageTx.Error, nil } func (i *Indexer) Close() error { diff --git a/api/indexer/storage_tx.canoto.go b/api/indexer/storage_tx.canoto.go new file mode 100644 index 0000000000..23c4f6172c --- /dev/null +++ b/api/indexer/storage_tx.canoto.go @@ -0,0 +1,290 @@ +// Code generated by canoto. DO NOT EDIT. +// versions: +// canoto v0.10.0 +// source: api/indexer/storage_tx.go + +package indexer + +import ( + "io" + "sync/atomic" + "unicode/utf8" + + "github.com/ava-labs/hypersdk/internal/canoto" +) + +// Ensure that unused imports do not error +var ( + _ atomic.Int64 + + _ = io.ErrUnexpectedEOF + _ = utf8.ValidString +) + +const ( + canoto__storageTx__Timestamp__tag = "\x08" // canoto.Tag(1, canoto.Varint) + canoto__storageTx__Success__tag = "\x10" // canoto.Tag(2, canoto.Varint) + canoto__storageTx__Units__tag = "\x1a" // canoto.Tag(3, canoto.Len) + canoto__storageTx__Fee__tag = "\x20" // canoto.Tag(4, canoto.Varint) + canoto__storageTx__Outputs__tag = "\x2a" // canoto.Tag(5, canoto.Len) + canoto__storageTx__Error__tag = "\x32" // canoto.Tag(6, canoto.Len) +) + +type canotoData_storageTx struct { + // Enforce noCopy before atomic usage. + // See https://github.com/StephenButtolph/canoto/pull/32 + _ atomic.Int64 + + size int +} + +// MakeCanoto creates a new empty value. +func (*storageTx) MakeCanoto() *storageTx { + return new(storageTx) +} + +// UnmarshalCanoto unmarshals a Canoto-encoded byte slice into the struct. +// +// OneOf fields are cached during the unmarshaling process. +// +// The struct is not cleared before unmarshaling, any fields not present in the +// bytes will retain their previous values. If a OneOf field was previously +// cached as being set, attempting to unmarshal that OneOf again will return +// canoto.ErrDuplicateOneOf. +func (c *storageTx) UnmarshalCanoto(bytes []byte) error { + r := canoto.Reader{ + B: bytes, + } + return c.UnmarshalCanotoFrom(r) +} + +// UnmarshalCanotoFrom populates the struct from a canoto.Reader. Most users +// should just use UnmarshalCanoto. +// +// OneOf fields are cached during the unmarshaling process. +// +// The struct is not cleared before unmarshaling, any fields not present in the +// bytes will retain their previous values. If a OneOf field was previously +// cached as being set, attempting to unmarshal that OneOf again will return +// canoto.ErrDuplicateOneOf. +// +// This function enables configuration of reader options. +func (c *storageTx) UnmarshalCanotoFrom(r canoto.Reader) error { + var minField uint32 + for canoto.HasNext(&r) { + field, wireType, err := canoto.ReadTag(&r) + if err != nil { + return err + } + if field < minField { + return canoto.ErrInvalidFieldOrder + } + + switch field { + case 1: + if wireType != canoto.Varint { + return canoto.ErrUnexpectedWireType + } + + if err := canoto.ReadInt(&r, &c.Timestamp); err != nil { + return err + } + if canoto.IsZero(c.Timestamp) { + return canoto.ErrZeroValue + } + case 2: + if wireType != canoto.Varint { + return canoto.ErrUnexpectedWireType + } + + if err := canoto.ReadBool(&r, &c.Success); err != nil { + return err + } + if canoto.IsZero(c.Success) { + return canoto.ErrZeroValue + } + case 3: + if wireType != canoto.Len { + return canoto.ErrUnexpectedWireType + } + + if err := canoto.ReadBytes(&r, &c.Units); err != nil { + return err + } + if len(c.Units) == 0 { + return canoto.ErrZeroValue + } + case 4: + if wireType != canoto.Varint { + return canoto.ErrUnexpectedWireType + } + + if err := canoto.ReadInt(&r, &c.Fee); err != nil { + return err + } + if canoto.IsZero(c.Fee) { + return canoto.ErrZeroValue + } + case 5: + if wireType != canoto.Len { + return canoto.ErrUnexpectedWireType + } + + remainingBytes := r.B + originalUnsafe := r.Unsafe + r.Unsafe = true + err := canoto.ReadBytes(&r, new([]byte)) + r.Unsafe = originalUnsafe + if err != nil { + return err + } + + count, err := canoto.CountBytes(r.B, canoto__storageTx__Outputs__tag) + if err != nil { + return err + } + c.Outputs = canoto.MakeSlice(c.Outputs, 1+count) + + r.B = remainingBytes + if err := canoto.ReadBytes(&r, &c.Outputs[0]); err != nil { + return err + } + for i := range count { + r.B = r.B[len(canoto__storageTx__Outputs__tag):] + if err := canoto.ReadBytes(&r, &c.Outputs[1+i]); err != nil { + return err + } + } + case 6: + if wireType != canoto.Len { + return canoto.ErrUnexpectedWireType + } + + if err := canoto.ReadString(&r, &c.Error); err != nil { + return err + } + if len(c.Error) == 0 { + return canoto.ErrZeroValue + } + default: + return canoto.ErrUnknownField + } + + minField = field + 1 + } + return nil +} + +// ValidCanoto validates that the struct can be correctly marshaled into the +// Canoto format. +// +// Specifically, ValidCanoto ensures: +// 1. All OneOfs are specified at most once. +// 2. All strings are valid utf-8. +// 3. All custom fields are ValidCanoto. +func (c *storageTx) ValidCanoto() bool { + if c == nil { + return true + } + if !utf8.ValidString(string(c.Error)) { + return false + } + return true +} + +// CalculateCanotoCache populates size and OneOf caches based on the current +// values in the struct. +// +// It is not safe to call this function concurrently. +func (c *storageTx) CalculateCanotoCache() { + if c == nil { + return + } + c.canotoData.size = 0 + if !canoto.IsZero(c.Timestamp) { + c.canotoData.size += len(canoto__storageTx__Timestamp__tag) + canoto.SizeInt(c.Timestamp) + } + if !canoto.IsZero(c.Success) { + c.canotoData.size += len(canoto__storageTx__Success__tag) + canoto.SizeBool + } + if len(c.Units) != 0 { + c.canotoData.size += len(canoto__storageTx__Units__tag) + canoto.SizeBytes(c.Units) + } + if !canoto.IsZero(c.Fee) { + c.canotoData.size += len(canoto__storageTx__Fee__tag) + canoto.SizeInt(c.Fee) + } + for _, v := range c.Outputs { + c.canotoData.size += len(canoto__storageTx__Outputs__tag) + canoto.SizeBytes(v) + } + if len(c.Error) != 0 { + c.canotoData.size += len(canoto__storageTx__Error__tag) + canoto.SizeBytes(c.Error) + } +} + +// CachedCanotoSize returns the previously calculated size of the Canoto +// representation from CalculateCanotoCache. +// +// If CalculateCanotoCache has not yet been called, it will return 0. +// +// If the struct has been modified since the last call to CalculateCanotoCache, +// the returned size may be incorrect. +func (c *storageTx) CachedCanotoSize() int { + if c == nil { + return 0 + } + return c.canotoData.size +} + +// MarshalCanoto returns the Canoto representation of this struct. +// +// It is assumed that this struct is ValidCanoto. +// +// It is not safe to call this function concurrently. +func (c *storageTx) MarshalCanoto() []byte { + c.CalculateCanotoCache() + w := canoto.Writer{ + B: make([]byte, 0, c.CachedCanotoSize()), + } + w = c.MarshalCanotoInto(w) + return w.B +} + +// MarshalCanotoInto writes the struct into a canoto.Writer and returns the +// resulting canoto.Writer. Most users should just use MarshalCanoto. +// +// It is assumed that CalculateCanotoCache has been called since the last +// modification to this struct. +// +// It is assumed that this struct is ValidCanoto. +// +// It is not safe to call this function concurrently. +func (c *storageTx) MarshalCanotoInto(w canoto.Writer) canoto.Writer { + if c == nil { + return w + } + if !canoto.IsZero(c.Timestamp) { + canoto.Append(&w, canoto__storageTx__Timestamp__tag) + canoto.AppendInt(&w, c.Timestamp) + } + if !canoto.IsZero(c.Success) { + canoto.Append(&w, canoto__storageTx__Success__tag) + canoto.AppendBool(&w, true) + } + if len(c.Units) != 0 { + canoto.Append(&w, canoto__storageTx__Units__tag) + canoto.AppendBytes(&w, c.Units) + } + if !canoto.IsZero(c.Fee) { + canoto.Append(&w, canoto__storageTx__Fee__tag) + canoto.AppendInt(&w, c.Fee) + } + for _, v := range c.Outputs { + canoto.Append(&w, canoto__storageTx__Outputs__tag) + canoto.AppendBytes(&w, v) + } + if len(c.Error) != 0 { + canoto.Append(&w, canoto__storageTx__Error__tag) + canoto.AppendBytes(&w, c.Error) + } + return w +} diff --git a/api/indexer/storage_tx.go b/api/indexer/storage_tx.go new file mode 100644 index 0000000000..0b42aeaf4a --- /dev/null +++ b/api/indexer/storage_tx.go @@ -0,0 +1,15 @@ +// Copyright (C) 2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package indexer + +type storageTx struct { + Timestamp int64 `canoto:"int,1"` + Success bool `canoto:"bool,2"` + Units []byte `canoto:"bytes,3"` + Fee uint64 `canoto:"int,4"` + Outputs [][]byte `canoto:"repeated bytes,5"` + Error string `canoto:"string,6"` + + canotoData canotoData_storageTx +} diff --git a/api/ws/client.go b/api/ws/client.go index 86ae111720..2f4eb3010e 100644 --- a/api/ws/client.go +++ b/api/ws/client.go @@ -84,7 +84,7 @@ func NewWebSocketClient(uri string, handshakeTimeout time.Duration, pending int, utils.Outf("{{orange}}got empty message{{/}}\n") continue } - msgs, err := pubsub.ParseBatchMessage(pubsub.MaxWriteMessageSize, msgBatch) + msgs, err := pubsub.ParseBatchMessage(msgBatch) if err != nil { utils.Outf("{{orange}}received invalid message:{{/}} %v\n", err) continue @@ -177,17 +177,21 @@ func (c *WebSocketClient) RegisterRawTx(txBytes []byte) error { // ListenTx listens for responses from the streamingServer. // +// Returns the txID and result of the transaction. A nil result +// indicates the transaction was marked as expired after accepting +// a block past the expiry time of the tx. +// // TODO: add the option to subscribe to a single TxID to avoid // trampling other listeners (could have an intermediate tracking // layer in the client so no changes required in the server). -func (c *WebSocketClient) ListenTx(ctx context.Context) (ids.ID, error, *chain.Result, error) { +func (c *WebSocketClient) ListenTx(ctx context.Context) (ids.ID, *chain.Result, error) { select { case msg := <-c.pendingTxs: - return UnpackTxMessage(msg) + return unpackTxMessage(msg) case <-c.readStopped: - return ids.Empty, nil, nil, c.err + return ids.Empty, nil, c.err case <-ctx.Done(): - return ids.Empty, nil, nil, ctx.Err() + return ids.Empty, nil, ctx.Err() } } diff --git a/api/ws/packer.canoto.go b/api/ws/packer.canoto.go new file mode 100644 index 0000000000..3d85efe05b --- /dev/null +++ b/api/ws/packer.canoto.go @@ -0,0 +1,207 @@ +// Code generated by canoto. DO NOT EDIT. +// versions: +// canoto v0.10.0 +// source: api/ws/packer.go + +package ws + +import ( + "io" + "sync/atomic" + "unicode/utf8" + + "github.com/ava-labs/hypersdk/internal/canoto" +) + +// Ensure that unused imports do not error +var ( + _ atomic.Int64 + + _ = io.ErrUnexpectedEOF + _ = utf8.ValidString +) + +const ( + canoto__txMessage__TxID__tag = "\x0a" // canoto.Tag(1, canoto.Len) + canoto__txMessage__ResultBytes__tag = "\x12" // canoto.Tag(2, canoto.Len) +) + +type canotoData_txMessage struct { + // Enforce noCopy before atomic usage. + // See https://github.com/StephenButtolph/canoto/pull/32 + _ atomic.Int64 + + size int +} + +// MakeCanoto creates a new empty value. +func (*txMessage) MakeCanoto() *txMessage { + return new(txMessage) +} + +// UnmarshalCanoto unmarshals a Canoto-encoded byte slice into the struct. +// +// OneOf fields are cached during the unmarshaling process. +// +// The struct is not cleared before unmarshaling, any fields not present in the +// bytes will retain their previous values. If a OneOf field was previously +// cached as being set, attempting to unmarshal that OneOf again will return +// canoto.ErrDuplicateOneOf. +func (c *txMessage) UnmarshalCanoto(bytes []byte) error { + r := canoto.Reader{ + B: bytes, + } + return c.UnmarshalCanotoFrom(r) +} + +// UnmarshalCanotoFrom populates the struct from a canoto.Reader. Most users +// should just use UnmarshalCanoto. +// +// OneOf fields are cached during the unmarshaling process. +// +// The struct is not cleared before unmarshaling, any fields not present in the +// bytes will retain their previous values. If a OneOf field was previously +// cached as being set, attempting to unmarshal that OneOf again will return +// canoto.ErrDuplicateOneOf. +// +// This function enables configuration of reader options. +func (c *txMessage) UnmarshalCanotoFrom(r canoto.Reader) error { + var minField uint32 + for canoto.HasNext(&r) { + field, wireType, err := canoto.ReadTag(&r) + if err != nil { + return err + } + if field < minField { + return canoto.ErrInvalidFieldOrder + } + + switch field { + case 1: + if wireType != canoto.Len { + return canoto.ErrUnexpectedWireType + } + + var length int64 + if err := canoto.ReadInt(&r, &length); err != nil { + return err + } + + const ( + expectedLength = len(c.TxID) + expectedLengthInt64 = int64(expectedLength) + ) + if length != expectedLengthInt64 { + return canoto.ErrInvalidLength + } + if expectedLength > len(r.B) { + return io.ErrUnexpectedEOF + } + + copy(c.TxID[:], r.B) + if canoto.IsZero(c.TxID) { + return canoto.ErrZeroValue + } + r.B = r.B[expectedLength:] + case 2: + if wireType != canoto.Len { + return canoto.ErrUnexpectedWireType + } + + if err := canoto.ReadBytes(&r, &c.ResultBytes); err != nil { + return err + } + if len(c.ResultBytes) == 0 { + return canoto.ErrZeroValue + } + default: + return canoto.ErrUnknownField + } + + minField = field + 1 + } + return nil +} + +// ValidCanoto validates that the struct can be correctly marshaled into the +// Canoto format. +// +// Specifically, ValidCanoto ensures: +// 1. All OneOfs are specified at most once. +// 2. All strings are valid utf-8. +// 3. All custom fields are ValidCanoto. +func (c *txMessage) ValidCanoto() bool { + if c == nil { + return true + } + return true +} + +// CalculateCanotoCache populates size and OneOf caches based on the current +// values in the struct. +// +// It is not safe to call this function concurrently. +func (c *txMessage) CalculateCanotoCache() { + if c == nil { + return + } + c.canotoData.size = 0 + if !canoto.IsZero(c.TxID) { + c.canotoData.size += len(canoto__txMessage__TxID__tag) + canoto.SizeBytes(c.TxID[:]) + } + if len(c.ResultBytes) != 0 { + c.canotoData.size += len(canoto__txMessage__ResultBytes__tag) + canoto.SizeBytes(c.ResultBytes) + } +} + +// CachedCanotoSize returns the previously calculated size of the Canoto +// representation from CalculateCanotoCache. +// +// If CalculateCanotoCache has not yet been called, it will return 0. +// +// If the struct has been modified since the last call to CalculateCanotoCache, +// the returned size may be incorrect. +func (c *txMessage) CachedCanotoSize() int { + if c == nil { + return 0 + } + return c.canotoData.size +} + +// MarshalCanoto returns the Canoto representation of this struct. +// +// It is assumed that this struct is ValidCanoto. +// +// It is not safe to call this function concurrently. +func (c *txMessage) MarshalCanoto() []byte { + c.CalculateCanotoCache() + w := canoto.Writer{ + B: make([]byte, 0, c.CachedCanotoSize()), + } + w = c.MarshalCanotoInto(w) + return w.B +} + +// MarshalCanotoInto writes the struct into a canoto.Writer and returns the +// resulting canoto.Writer. Most users should just use MarshalCanoto. +// +// It is assumed that CalculateCanotoCache has been called since the last +// modification to this struct. +// +// It is assumed that this struct is ValidCanoto. +// +// It is not safe to call this function concurrently. +func (c *txMessage) MarshalCanotoInto(w canoto.Writer) canoto.Writer { + if c == nil { + return w + } + if !canoto.IsZero(c.TxID) { + canoto.Append(&w, canoto__txMessage__TxID__tag) + canoto.AppendBytes(&w, c.TxID[:]) + } + if len(c.ResultBytes) != 0 { + canoto.Append(&w, canoto__txMessage__ResultBytes__tag) + canoto.AppendBytes(&w, c.ResultBytes) + } + return w +} diff --git a/api/ws/packer.go b/api/ws/packer.go index a94911627d..229582b862 100644 --- a/api/ws/packer.go +++ b/api/ws/packer.go @@ -4,13 +4,9 @@ package ws import ( - "errors" - "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/hypersdk/chain" - "github.com/ava-labs/hypersdk/codec" - "github.com/ava-labs/hypersdk/consts" ) const ( @@ -18,47 +14,52 @@ const ( TxMode byte = 1 ) -// Could be a better place for these methods -// Packs an accepted block message -func PackAcceptedTxMessage(txID ids.ID, result *chain.Result) ([]byte, error) { - size := ids.IDLen + consts.BoolLen + result.Size() - p := codec.NewWriter(size, consts.MaxInt) - p.PackID(txID) - p.PackBool(false) - if err := result.Marshal(p); err != nil { - return nil, err - } - return p.Bytes(), p.Err() -} +type txMessage struct { + TxID ids.ID `canoto:"fixed bytes,1"` + ResultBytes []byte `canoto:"bytes,2"` -// Packs a removed block message -func PackRemovedTxMessage(txID ids.ID, err error) ([]byte, error) { - errString := err.Error() - size := ids.IDLen + consts.BoolLen + codec.StringLen(errString) - p := codec.NewWriter(size, consts.MaxInt) - p.PackID(txID) - p.PackBool(true) - p.PackString(errString) - return p.Bytes(), p.Err() + canotoData canotoData_txMessage } -// Unpacks a tx message from [msg]. Returns the txID, an error regarding the status -// of the tx, the result of the tx, and an error if there was a -// problem unpacking the message. -func UnpackTxMessage(msg []byte) (ids.ID, error, *chain.Result, error) { - p := codec.NewReader(msg, consts.MaxInt) - var txID ids.ID - p.UnpackID(true, &txID) - if p.UnpackBool() { - err := p.UnpackString(true) - return ids.Empty, errors.New(err), nil, p.Err() +// packTxMessage packs a txID and result. A nil result indicates the tx was +// marked as expired. +// Expiry is the only failure condition that triggers a notification sent to +// the client. +func packTxMessage(txID ids.ID, result *chain.Result) ([]byte, error) { + var ( + resultBytes []byte + err error + ) + if result != nil { + resultBytes, err = result.Marshal() + if err != nil { + return nil, err + } } - result, err := chain.UnmarshalResult(p) - if err != nil { - return ids.Empty, nil, nil, err + txMessage := txMessage{ + TxID: txID, + ResultBytes: resultBytes, + } + txMessageBytes := txMessage.MarshalCanoto() + return txMessageBytes, nil +} + +// unpackTxMessage unpacks a txID and result. A nil result indicates the +// tx was marked as expired (only failure condition that triggers a notification). +func unpackTxMessage(txMsgBytes []byte) (ids.ID, *chain.Result, error) { + txMessage := txMessage{} + if err := txMessage.UnmarshalCanoto(txMsgBytes); err != nil { + return ids.Empty, nil, err } - if !p.Empty() { - return ids.Empty, nil, nil, chain.ErrInvalidObject + var ( + result *chain.Result + err error + ) + if len(txMessage.ResultBytes) > 0 { + result, err = chain.UnmarshalResult(txMessage.ResultBytes) + if err != nil { + return ids.Empty, nil, err + } } - return txID, nil, result, p.Err() + return txMessage.TxID, result, nil } diff --git a/api/ws/server.go b/api/ws/server.go index 6151cf04f9..891d88aa40 100644 --- a/api/ws/server.go +++ b/api/ws/server.go @@ -5,7 +5,6 @@ package ws import ( "context" - "errors" "sync" "github.com/ava-labs/avalanchego/ids" @@ -28,11 +27,7 @@ const ( Namespace = "websocket" ) -var ( - _ api.HandlerFactory[api.VM] = (*WebSocketServerFactory)(nil) - - ErrExpired = errors.New("expired") -) +var _ api.HandlerFactory[api.VM] = (*WebSocketServerFactory)(nil) type Config struct { Enabled bool `json:"enabled"` @@ -153,12 +148,13 @@ func (w *WebSocketServer) AddTxListener(tx *chain.Transaction, c *pubsub.Connect w.expiringTxs.Add([]*chain.Transaction{tx}) } -func (w *WebSocketServer) removeTx(txID ids.ID, err error) error { +func (w *WebSocketServer) expireTx(txID ids.ID) error { listeners, ok := w.txListeners[txID] if !ok { return nil } - bytes, err := PackRemovedTxMessage(txID, err) + // nil result indicates the transaction expired + bytes, err := packTxMessage(txID, nil) if err != nil { return err } @@ -171,7 +167,7 @@ func (w *WebSocketServer) removeTx(txID ids.ID, err error) error { func (w *WebSocketServer) setMinTx(t int64) error { expired := w.expiringTxs.SetMin(t) for _, id := range expired { - if err := w.removeTx(id, ErrExpired); err != nil { + if err := w.expireTx(id); err != nil { return err } } @@ -203,7 +199,7 @@ func (w *WebSocketServer) AcceptBlock(_ context.Context, b *chain.ExecutedBlock) continue } // Publish to tx listener - bytes, err := PackAcceptedTxMessage(txID, results[i]) + bytes, err := packTxMessage(txID, results[i]) if err != nil { return err } diff --git a/chain/result.go b/chain/result.go index 1295e69ee0..f6b24740dd 100644 --- a/chain/result.go +++ b/chain/result.go @@ -73,7 +73,15 @@ func (r *Result) Size() int { return consts.BoolLen + codec.BytesLen(r.Error) + outputSize + fees.DimensionsLen + consts.Uint64Len } -func (r *Result) Marshal(p *codec.Packer) error { +func (r *Result) Marshal() ([]byte, error) { + p := codec.NewWriter(r.Size(), consts.MaxInt) + if err := r.marshalInto(p); err != nil { + return nil, err + } + return p.Bytes(), p.Err() +} + +func (r *Result) marshalInto(p *codec.Packer) error { p.PackBool(r.Success) p.PackBytes(r.Error) p.PackByte(uint8(len(r.Outputs))) @@ -82,7 +90,7 @@ func (r *Result) Marshal(p *codec.Packer) error { } p.PackFixedBytes(r.Units.Bytes()) p.PackUint64(r.Fee) - return nil + return p.Err() } func MarshalResults(src []*Result) ([]byte, error) { @@ -90,14 +98,26 @@ func MarshalResults(src []*Result) ([]byte, error) { p := codec.NewWriter(size, consts.MaxInt) // could be much larger than [NetworkSizeLimit] p.PackInt(uint32(len(src))) for _, result := range src { - if err := result.Marshal(p); err != nil { + if err := result.marshalInto(p); err != nil { return nil, err } } return p.Bytes(), p.Err() } -func UnmarshalResult(p *codec.Packer) (*Result, error) { +func UnmarshalResult(src []byte) (*Result, error) { + p := codec.NewReader(src, consts.MaxInt) + result, err := unmarshalResultFrom(p) + if err != nil { + return nil, err + } + if !p.Empty() { + return nil, ErrInvalidObject + } + return result, nil +} + +func unmarshalResultFrom(p *codec.Packer) (*Result, error) { result := &Result{ Success: p.UnpackBool(), } @@ -127,7 +147,7 @@ func UnmarshalResults(src []byte) ([]*Result, error) { items := p.UnpackInt(false) results := make([]*Result, items) for i := uint32(0); i < items; i++ { - result, err := UnmarshalResult(p) + result, err := unmarshalResultFrom(p) if err != nil { return nil, err } diff --git a/examples/morpheusvm/cmd/morpheus-cli/cmd/resolutions.go b/examples/morpheusvm/cmd/morpheus-cli/cmd/resolutions.go index aab779fb89..e45d4f8efa 100644 --- a/examples/morpheusvm/cmd/morpheus-cli/cmd/resolutions.go +++ b/examples/morpheusvm/cmd/morpheus-cli/cmd/resolutions.go @@ -37,17 +37,17 @@ func sendAndWait( } var result *chain.Result for { - txID, txErr, txResult, err := ws.ListenTx(ctx) + txID, txResult, err := ws.ListenTx(ctx) if err != nil { return false, ids.Empty, err } - if txErr != nil { - return false, ids.Empty, txErr - } if txID == tx.GetID() { result = txResult break } + if result == nil { + return false, ids.Empty, fmt.Errorf("tx %s expired", txID) + } utils.Outf("{{yellow}}skipping unexpected transaction:{{/}} %s\n", tx.GetID()) } if printStatus { diff --git a/internal/canoto/canoto.go b/internal/canoto/canoto.go new file mode 100644 index 0000000000..dd237112f8 --- /dev/null +++ b/internal/canoto/canoto.go @@ -0,0 +1,506 @@ +// Code generated by canoto. DO NOT EDIT. +// versions: +// canoto v0.10.0 + +// Canoto provides common functionality required for reading and writing the +// canoto format. +package canoto + +import ( + "encoding/binary" + "errors" + "io" + "math/bits" + "slices" + "unicode/utf8" + "unsafe" + + _ "embed" +) + +const ( + Varint WireType = iota + I64 + Len + _ // SGROUP is deprecated and not supported + _ // EGROUP is deprecated and not supported + I32 + + // SizeFint32 is the size of a 32-bit fixed size integer in bytes. + SizeFint32 = 4 + // SizeFint64 is the size of a 64-bit fixed size integer in bytes. + SizeFint64 = 8 + // SizeBool is the size of a boolean in bytes. + SizeBool = 1 + + // MaxFieldNumber is the maximum field number allowed to be used in a Tag. + MaxFieldNumber = 1<<29 - 1 + + // Version is the current version of the canoto library. + Version = "v0.10.0" + + wireTypeLength = 3 + wireTypeMask = 0x07 + + falseByte = 0 + trueByte = 1 + continuationMask = 0x80 +) + +var ( + // Code is the actual golang code for this library; including this comment. + // + // This variable is not used internally, so the compiler is smart enough to + // omit this value from the binary if the user of this library does not + // utilize this variable; at least at the time of writing. + // + // This can be used during codegen to generate this library. + // + //go:embed canoto.go + Code string + + ErrInvalidFieldOrder = errors.New("invalid field order") + ErrUnexpectedWireType = errors.New("unexpected wire type") + ErrDuplicateOneOf = errors.New("duplicate oneof field") + ErrInvalidLength = errors.New("decoded length is invalid") + ErrZeroValue = errors.New("zero value") + ErrUnknownField = errors.New("unknown field") + ErrPaddedZeroes = errors.New("padded zeroes") + + ErrOverflow = errors.New("overflow") + ErrInvalidWireType = errors.New("invalid wire type") + ErrInvalidBool = errors.New("decoded bool is neither true nor false") + ErrStringNotUTF8 = errors.New("decoded string is not UTF-8") +) + +type ( + Sint interface { + ~int8 | ~int16 | ~int32 | ~int64 + } + Uint interface { + ~uint8 | ~uint16 | ~uint32 | ~uint64 + } + Int interface{ Sint | Uint } + Int32 interface{ ~int32 | ~uint32 } + Int64 interface{ ~int64 | ~uint64 } + Bytes interface{ ~string | ~[]byte } + + // Message defines a type that can be a stand-alone Canoto message. + Message interface { + Field + // MarshalCanoto returns the Canoto representation of this message. + // + // It is assumed that this message is ValidCanoto. + MarshalCanoto() []byte + // UnmarshalCanoto unmarshals a Canoto-encoded byte slice into the message. + UnmarshalCanoto(bytes []byte) error + } + + // Field defines a type that can be included inside of a Canoto message. + Field interface { + // MarshalCanotoInto writes the field into a canoto.Writer and returns + // the resulting canoto.Writer. + // + // It is assumed that CalculateCanotoCache has been called since the + // last modification to this field. + // + // It is assumed that this field is ValidCanoto. + MarshalCanotoInto(w Writer) Writer + // CalculateCanotoCache populates internal caches based on the current + // values in the struct. + CalculateCanotoCache() + // CachedCanotoSize returns the previously calculated size of the Canoto + // representation from CalculateCanotoCache. + // + // If CalculateCanotoCache has not yet been called, or the field has + // been modified since the last call to CalculateCanotoCache, the + // returned size may be incorrect. + CachedCanotoSize() int + // UnmarshalCanotoFrom populates the field from a canoto.Reader. + UnmarshalCanotoFrom(r Reader) error + // ValidCanoto validates that the field can be correctly marshaled into + // the Canoto format. + ValidCanoto() bool + } + + // FieldPointer is a pointer to a concrete Field value T. + // + // This type must be used when implementing a value for a generic Field. + FieldPointer[T any] interface { + Field + *T + } + + // FieldMaker is a Field that can create a new value of type T. + // + // The returned value must be able to be unmarshaled into. + // + // This type can be used when implementing a generic Field. However, if T is + // an interface, it is possible for generated code to compile and panic at + // runtime. + FieldMaker[T any] interface { + Field + MakeCanoto() T + } + + // WireType represents the Proto wire description of a field. Within Proto + // it is used to provide forwards compatibility. For Canoto, it exists to + // provide compatibility with Proto. + WireType byte + + // Reader contains all the state needed to unmarshal a Canoto type. + // + // The functions in this package are not methods on the Reader type to + // enable the usage of generics. + Reader struct { + B []byte + Unsafe bool + } + + // Writer contains all the state needed to marshal a Canoto type. + // + // The functions in this package are not methods on the Writer type to + // enable the usage of generics. + Writer struct { + B []byte + } +) + +func (w WireType) IsValid() bool { + switch w { + case Varint, I64, Len, I32: + return true + default: + return false + } +} + +func (w WireType) String() string { + switch w { + case Varint: + return "Varint" + case I64: + return "I64" + case Len: + return "Len" + case I32: + return "I32" + default: + return "Invalid" + } +} + +// HasNext returns true if there are more bytes to read. +func HasNext(r *Reader) bool { + return len(r.B) > 0 +} + +// Append writes unprefixed bytes to the writer. +func Append[T Bytes](w *Writer, v T) { + w.B = append(w.B, v...) +} + +// Tag calculates the tag for a field number and wire type. +// +// This function should not typically be used during marshaling, as tags can be +// precomputed. +func Tag(fieldNumber uint32, wireType WireType) []byte { + w := Writer{} + AppendInt(&w, fieldNumber<> wireTypeLength, wireType, nil +} + +// SizeInt calculates the size of an integer when encoded as a varint. +func SizeInt[T Int](v T) int { + if v == 0 { + return 1 + } + return (bits.Len64(uint64(v)) + 6) / 7 +} + +// CountInts counts the number of varints that are encoded in bytes. +func CountInts(bytes []byte) int { + var count int + for _, b := range bytes { + if b < continuationMask { + count++ + } + } + return count +} + +// ReadInt reads a varint encoded integer from the reader. +func ReadInt[T Int](r *Reader, v *T) error { + val, bytesRead := binary.Uvarint(r.B) + switch { + case bytesRead == 0: + return io.ErrUnexpectedEOF + case bytesRead < 0 || uint64(T(val)) != val: + return ErrOverflow + // To ensure decoding is canonical, we check for padded zeroes in the + // varint. + // The last byte of the varint includes the most significant bits. + // If the last byte is 0, then the number should have been encoded more + // efficiently by removing this zero. + case bytesRead > 1 && r.B[bytesRead-1] == 0x00: + return ErrPaddedZeroes + default: + r.B = r.B[bytesRead:] + *v = T(val) + return nil + } +} + +// AppendInt writes an integer to the writer as a varint. +func AppendInt[T Int](w *Writer, v T) { + w.B = binary.AppendUvarint(w.B, uint64(v)) +} + +// SizeSint calculates the size of an integer when zigzag encoded as a varint. +func SizeSint[T Sint](v T) int { + if v == 0 { + return 1 + } + + var uv uint64 + if v > 0 { + uv = uint64(v) << 1 + } else { + uv = ^uint64(v)<<1 | 1 + } + return (bits.Len64(uv) + 6) / 7 +} + +// ReadSint reads a zigzag encoded integer from the reader. +func ReadSint[T Sint](r *Reader, v *T) error { + var largeVal uint64 + if err := ReadInt(r, &largeVal); err != nil { + return err + } + + uVal := largeVal >> 1 + val := T(uVal) + // If T is an int32, it's possible that some bits were truncated during the + // cast. In this case, casting back to uint64 would result in a different + // value. + if uint64(val) != uVal { + return ErrOverflow + } + + if largeVal&1 != 0 { + val = ^val + } + *v = val + return nil +} + +// AppendSint writes an integer to the writer as a zigzag encoded varint. +func AppendSint[T Sint](w *Writer, v T) { + if v >= 0 { + w.B = binary.AppendUvarint(w.B, uint64(v)<<1) + } else { + w.B = binary.AppendUvarint(w.B, ^uint64(v)<<1|1) + } +} + +// ReadFint32 reads a 32-bit fixed size integer from the reader. +func ReadFint32[T Int32](r *Reader, v *T) error { + if len(r.B) < SizeFint32 { + return io.ErrUnexpectedEOF + } + + *v = T(binary.LittleEndian.Uint32(r.B)) + r.B = r.B[SizeFint32:] + return nil +} + +// AppendFint32 writes a 32-bit fixed size integer to the writer. +func AppendFint32[T Int32](w *Writer, v T) { + w.B = binary.LittleEndian.AppendUint32(w.B, uint32(v)) +} + +// ReadFint64 reads a 64-bit fixed size integer from the reader. +func ReadFint64[T Int64](r *Reader, v *T) error { + if len(r.B) < SizeFint64 { + return io.ErrUnexpectedEOF + } + + *v = T(binary.LittleEndian.Uint64(r.B)) + r.B = r.B[SizeFint64:] + return nil +} + +// AppendFint64 writes a 64-bit fixed size integer to the writer. +func AppendFint64[T Int64](w *Writer, v T) { + w.B = binary.LittleEndian.AppendUint64(w.B, uint64(v)) +} + +// ReadBool reads a boolean from the reader. +func ReadBool[T ~bool](r *Reader, v *T) error { + switch { + case len(r.B) < SizeBool: + return io.ErrUnexpectedEOF + case r.B[0] > trueByte: + return ErrInvalidBool + default: + *v = r.B[0] == trueByte + r.B = r.B[SizeBool:] + return nil + } +} + +// AppendBool writes a boolean to the writer. +func AppendBool[T ~bool](w *Writer, b T) { + if b { + w.B = append(w.B, trueByte) + } else { + w.B = append(w.B, falseByte) + } +} + +// SizeBytes calculates the size the length-prefixed bytes would take if +// written. +func SizeBytes[T Bytes](v T) int { + return SizeInt(int64(len(v))) + len(v) +} + +// CountBytes counts the consecutive number of length-prefixed fields with the +// given tag. +func CountBytes(bytes []byte, tag string) (int, error) { + var ( + r = Reader{B: bytes} + count = 0 + ) + for HasPrefix(r.B, tag) { + r.B = r.B[len(tag):] + var length int64 + if err := ReadInt(&r, &length); err != nil { + return 0, err + } + if length < 0 { + return 0, ErrInvalidLength + } + if length > int64(len(r.B)) { + return 0, io.ErrUnexpectedEOF + } + r.B = r.B[length:] + count++ + } + return count, nil +} + +// HasPrefix returns true if the bytes start with the given prefix. +func HasPrefix(bytes []byte, prefix string) bool { + return len(bytes) >= len(prefix) && string(bytes[:len(prefix)]) == prefix +} + +// ReadString reads a string from the reader. The string is verified to be valid +// UTF-8. +func ReadString[T ~string](r *Reader, v *T) error { + var length int64 + if err := ReadInt[int64](r, &length); err != nil { + return err + } + if length < 0 { + return ErrInvalidLength + } + if length > int64(len(r.B)) { + return io.ErrUnexpectedEOF + } + + bytes := r.B[:length] + if !utf8.Valid(bytes) { + return ErrStringNotUTF8 + } + + r.B = r.B[length:] + if r.Unsafe { + *v = T(unsafeString(bytes)) + } else { + *v = T(bytes) + } + return nil +} + +// ReadBytes reads a byte slice from the reader. +func ReadBytes[T ~[]byte](r *Reader, v *T) error { + var length int64 + if err := ReadInt[int64](r, &length); err != nil { + return err + } + if length < 0 { + return ErrInvalidLength + } + if length > int64(len(r.B)) { + return io.ErrUnexpectedEOF + } + + bytes := r.B[:length] + r.B = r.B[length:] + if !r.Unsafe { + bytes = slices.Clone(bytes) + } + *v = T(bytes) + return nil +} + +// AppendBytes writes a length-prefixed byte slice to the writer. +func AppendBytes[T Bytes](w *Writer, v T) { + AppendInt(w, int64(len(v))) + w.B = append(w.B, v...) +} + +// MakePointer creates a new pointer. It is equivalent to `new(T)`. +// +// This function is useful to use in auto-generated code, when the type of a +// variable is unknown. For example, if we have a variable `v` which we know to +// be a pointer, but we do not know the type of the pointer, we can use this +// function to leverage golang's type inference to create the new pointer. +func MakePointer[T any](_ *T) *T { + return new(T) +} + +// MakeSlice creates a new slice with the given length. It is equivalent to +// `make([]T, length)`. +// +// This function is useful to use in auto-generated code, when the type of a +// variable is unknown. For example, if we have a variable `v` which we know to +// be a slice, but we do not know the type of the elements, we can use this +// function to leverage golang's type inference to create the new slice. +func MakeSlice[T any](_ []T, length int) []T { + return make([]T, length) +} + +// Zero returns the zero value for its type. +func Zero[T any](_ T) (_ T) { + return +} + +// IsZero returns true if the value is the zero value for its type. +func IsZero[T comparable](v T) bool { + var zero T + return v == zero +} + +// unsafeString converts a []byte to an unsafe string. +// +// Invariant: The input []byte must not be modified. +func unsafeString(b []byte) string { + // avoid copying during the conversion + return unsafe.String(unsafe.SliceData(b), len(b)) +} diff --git a/pubsub/connection.go b/pubsub/connection.go index 4cd1631503..871c260024 100644 --- a/pubsub/connection.go +++ b/pubsub/connection.go @@ -101,7 +101,7 @@ func (c *Connection) readPump() { ) return } - msgs, err := ParseBatchMessage(c.s.config.MaxReadMessageSize, responseBytes) + msgs, err := ParseBatchMessage(responseBytes) if err != nil { c.s.log.Debug("unable to read websockets message", zap.Error(err), diff --git a/pubsub/message_buffer.go b/pubsub/message_buffer.go index 386a615c85..02bc07cf8d 100644 --- a/pubsub/message_buffer.go +++ b/pubsub/message_buffer.go @@ -10,9 +10,6 @@ import ( "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/timer" "go.uber.org/zap" - - "github.com/ava-labs/hypersdk/codec" - "github.com/ava-labs/hypersdk/consts" ) type MessageBuffer struct { @@ -83,10 +80,7 @@ func (m *MessageBuffer) Close() error { } func (m *MessageBuffer) clearPending() error { - bm, err := CreateBatchMessage(m.maxSize, m.pending) - if err != nil { - return err - } + bm := CreateBatchMessage(m.pending) select { case m.Queue <- bm: default: @@ -126,31 +120,3 @@ func (m *MessageBuffer) Send(msg []byte) error { } return nil } - -func CreateBatchMessage(maxSize int, msgs [][]byte) ([]byte, error) { - size := consts.IntLen - for _, msg := range msgs { - size += codec.BytesLen(msg) - } - msgBatch := codec.NewWriter(size, maxSize) - msgBatch.PackInt(uint32(len(msgs))) - for _, msg := range msgs { - msgBatch.PackBytes(msg) - } - return msgBatch.Bytes(), msgBatch.Err() -} - -func ParseBatchMessage(maxSize int, msg []byte) ([][]byte, error) { - msgBatch := codec.NewReader(msg, maxSize) - msgLen := msgBatch.UnpackInt(true) - msgs := [][]byte{} - for i := uint32(0); i < msgLen; i++ { - var nextMsg []byte - msgBatch.UnpackBytes(-1, true, &nextMsg) - if err := msgBatch.Err(); err != nil { - return nil, err - } - msgs = append(msgs, nextMsg) - } - return msgs, msgBatch.Err() -} diff --git a/pubsub/messages.canoto.go b/pubsub/messages.canoto.go new file mode 100644 index 0000000000..bfda5ab4d9 --- /dev/null +++ b/pubsub/messages.canoto.go @@ -0,0 +1,192 @@ +// Code generated by canoto. DO NOT EDIT. +// versions: +// canoto v0.10.0 +// source: pubsub/messages.go + +package pubsub + +import ( + "io" + "sync/atomic" + "unicode/utf8" + + "github.com/ava-labs/hypersdk/internal/canoto" +) + +// Ensure that unused imports do not error +var ( + _ atomic.Int64 + + _ = io.ErrUnexpectedEOF + _ = utf8.ValidString +) + +const ( + canoto__BatchMessage__Messages__tag = "\x0a" // canoto.Tag(1, canoto.Len) +) + +type canotoData_BatchMessage struct { + // Enforce noCopy before atomic usage. + // See https://github.com/StephenButtolph/canoto/pull/32 + _ atomic.Int64 + + size int +} + +// MakeCanoto creates a new empty value. +func (*BatchMessage) MakeCanoto() *BatchMessage { + return new(BatchMessage) +} + +// UnmarshalCanoto unmarshals a Canoto-encoded byte slice into the struct. +// +// OneOf fields are cached during the unmarshaling process. +// +// The struct is not cleared before unmarshaling, any fields not present in the +// bytes will retain their previous values. If a OneOf field was previously +// cached as being set, attempting to unmarshal that OneOf again will return +// canoto.ErrDuplicateOneOf. +func (c *BatchMessage) UnmarshalCanoto(bytes []byte) error { + r := canoto.Reader{ + B: bytes, + } + return c.UnmarshalCanotoFrom(r) +} + +// UnmarshalCanotoFrom populates the struct from a canoto.Reader. Most users +// should just use UnmarshalCanoto. +// +// OneOf fields are cached during the unmarshaling process. +// +// The struct is not cleared before unmarshaling, any fields not present in the +// bytes will retain their previous values. If a OneOf field was previously +// cached as being set, attempting to unmarshal that OneOf again will return +// canoto.ErrDuplicateOneOf. +// +// This function enables configuration of reader options. +func (c *BatchMessage) UnmarshalCanotoFrom(r canoto.Reader) error { + var minField uint32 + for canoto.HasNext(&r) { + field, wireType, err := canoto.ReadTag(&r) + if err != nil { + return err + } + if field < minField { + return canoto.ErrInvalidFieldOrder + } + + switch field { + case 1: + if wireType != canoto.Len { + return canoto.ErrUnexpectedWireType + } + + remainingBytes := r.B + originalUnsafe := r.Unsafe + r.Unsafe = true + err := canoto.ReadBytes(&r, new([]byte)) + r.Unsafe = originalUnsafe + if err != nil { + return err + } + + count, err := canoto.CountBytes(r.B, canoto__BatchMessage__Messages__tag) + if err != nil { + return err + } + c.Messages = canoto.MakeSlice(c.Messages, 1+count) + + r.B = remainingBytes + if err := canoto.ReadBytes(&r, &c.Messages[0]); err != nil { + return err + } + for i := range count { + r.B = r.B[len(canoto__BatchMessage__Messages__tag):] + if err := canoto.ReadBytes(&r, &c.Messages[1+i]); err != nil { + return err + } + } + default: + return canoto.ErrUnknownField + } + + minField = field + 1 + } + return nil +} + +// ValidCanoto validates that the struct can be correctly marshaled into the +// Canoto format. +// +// Specifically, ValidCanoto ensures: +// 1. All OneOfs are specified at most once. +// 2. All strings are valid utf-8. +// 3. All custom fields are ValidCanoto. +func (c *BatchMessage) ValidCanoto() bool { + if c == nil { + return true + } + return true +} + +// CalculateCanotoCache populates size and OneOf caches based on the current +// values in the struct. +// +// It is not safe to call this function concurrently. +func (c *BatchMessage) CalculateCanotoCache() { + if c == nil { + return + } + c.canotoData.size = 0 + for _, v := range c.Messages { + c.canotoData.size += len(canoto__BatchMessage__Messages__tag) + canoto.SizeBytes(v) + } +} + +// CachedCanotoSize returns the previously calculated size of the Canoto +// representation from CalculateCanotoCache. +// +// If CalculateCanotoCache has not yet been called, it will return 0. +// +// If the struct has been modified since the last call to CalculateCanotoCache, +// the returned size may be incorrect. +func (c *BatchMessage) CachedCanotoSize() int { + if c == nil { + return 0 + } + return c.canotoData.size +} + +// MarshalCanoto returns the Canoto representation of this struct. +// +// It is assumed that this struct is ValidCanoto. +// +// It is not safe to call this function concurrently. +func (c *BatchMessage) MarshalCanoto() []byte { + c.CalculateCanotoCache() + w := canoto.Writer{ + B: make([]byte, 0, c.CachedCanotoSize()), + } + w = c.MarshalCanotoInto(w) + return w.B +} + +// MarshalCanotoInto writes the struct into a canoto.Writer and returns the +// resulting canoto.Writer. Most users should just use MarshalCanoto. +// +// It is assumed that CalculateCanotoCache has been called since the last +// modification to this struct. +// +// It is assumed that this struct is ValidCanoto. +// +// It is not safe to call this function concurrently. +func (c *BatchMessage) MarshalCanotoInto(w canoto.Writer) canoto.Writer { + if c == nil { + return w + } + for _, v := range c.Messages { + canoto.Append(&w, canoto__BatchMessage__Messages__tag) + canoto.AppendBytes(&w, v) + } + return w +} diff --git a/pubsub/messages.go b/pubsub/messages.go new file mode 100644 index 0000000000..160010dd6c --- /dev/null +++ b/pubsub/messages.go @@ -0,0 +1,23 @@ +// Copyright (C) 2024, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package pubsub + +type BatchMessage struct { + Messages [][]byte `canoto:"repeated bytes,1"` + + canotoData canotoData_BatchMessage +} + +func CreateBatchMessage(msgs [][]byte) []byte { + batchMessage := BatchMessage{Messages: msgs} + return batchMessage.MarshalCanoto() +} + +func ParseBatchMessage(msg []byte) ([][]byte, error) { + batchMessage := BatchMessage{} + if err := batchMessage.UnmarshalCanoto(msg); err != nil { + return nil, err + } + return batchMessage.Messages, nil +} diff --git a/pubsub/server_test.go b/pubsub/server_test.go index b0e6d860cf..931ed1fc23 100644 --- a/pubsub/server_test.go +++ b/pubsub/server_test.go @@ -16,8 +16,6 @@ import ( "github.com/ava-labs/avalanchego/utils/logging" "github.com/gorilla/websocket" "github.com/stretchr/testify/require" - - "github.com/ava-labs/hypersdk/consts" ) const dummyAddr = "localhost:8080" @@ -93,7 +91,7 @@ func TestServerPublish(t *testing.T) { // Receive the message from the publish _, batchMsg, err := webCon.ReadMessage() require.NoError(err, "Error receiveing message.") - msgs, err := ParseBatchMessage(MaxWriteMessageSize, batchMsg) + msgs, err := ParseBatchMessage(batchMsg) require.NoError(err, "Error parsing message.") require.Len(msgs, 1) // Verify that the received message is the expected dummy message @@ -160,8 +158,7 @@ func TestServerRead(t *testing.T) { require.NoError(err, "Error connecting to the server.") defer resp.Body.Close() id := ids.GenerateTestID() - batchMsg, err := CreateBatchMessage(consts.NetworkSizeLimit, [][]byte{id[:]}) - require.NoError(err) + batchMsg := CreateBatchMessage([][]byte{id[:]}) err = webCon.WriteMessage(websocket.TextMessage, batchMsg) require.NoError(err, "Error writing message to server.") // Wait for callback to be called @@ -290,7 +287,7 @@ func TestServerPublishSpecific(t *testing.T) { // Receive the message from the publish _, batchMsg, err := webCon1.ReadMessage() require.NoError(err, "Error reading to connection.") - msgs, err := ParseBatchMessage(MaxWriteMessageSize, batchMsg) + msgs, err := ParseBatchMessage(batchMsg) require.NoError(err, "Error parsing message.") require.Len(msgs, 1) // Verify that the received message is the expected dummy message diff --git a/throughput/issuer.go b/throughput/issuer.go index f748421fa1..ed5c2ec18c 100644 --- a/throughput/issuer.go +++ b/throughput/issuer.go @@ -38,7 +38,7 @@ func (i *issuer) Start(ctx context.Context) { i.tracker.issuerWg.Add(1) go func() { for { - _, wsErr, result, err := i.ws.ListenTx(context.TODO()) + txID, result, err := i.ws.ListenTx(context.TODO()) if err != nil { return } @@ -46,7 +46,7 @@ func (i *issuer) Start(ctx context.Context) { i.outstandingTxs-- i.l.Unlock() i.tracker.inflight.Add(-1) - i.tracker.logResult(result, wsErr) + i.tracker.logResult(txID, result) } }() go func() { diff --git a/throughput/pacer.go b/throughput/pacer.go index fb2d7842e1..5e1815f62d 100644 --- a/throughput/pacer.go +++ b/throughput/pacer.go @@ -25,13 +25,13 @@ func (p *pacer) Run(ctx context.Context, max int) { // Start a goroutine to process transaction results for range p.inflight { - _, wsErr, result, err := p.ws.ListenTx(ctx) + txID, result, err := p.ws.ListenTx(ctx) if err != nil { - p.done <- fmt.Errorf("error listening to tx: %w", err) + p.done <- fmt.Errorf("error listening to tx %s: %w", txID, err) return } - if wsErr != nil { - p.done <- fmt.Errorf("websocket error: %w", wsErr) + if result == nil { + p.done <- fmt.Errorf("tx %s expired", txID) return } if !result.Success { diff --git a/throughput/tracker.go b/throughput/tracker.go index ee5c15e196..d0d8f8c737 100644 --- a/throughput/tracker.go +++ b/throughput/tracker.go @@ -5,13 +5,13 @@ package throughput import ( "context" - "strings" "sync" "sync/atomic" "time" + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/hypersdk/api/jsonrpc" - "github.com/ava-labs/hypersdk/api/ws" "github.com/ava-labs/hypersdk/chain" "github.com/ava-labs/hypersdk/utils" ) @@ -27,25 +27,22 @@ type tracker struct { sent atomic.Int64 } -func (t *tracker) logResult( - result *chain.Result, - wsErr error, -) { +// logResult logs the result of a transaction received over the websocket connection +func (t *tracker) logResult(txID ids.ID, result *chain.Result) { t.l.Lock() - if result != nil { - if result.Success { - t.confirmedTxs++ - } else { - utils.Outf("{{orange}}on-chain tx failure:{{/}} %s %t\n", string(result.Error), result.Success) - } + defer t.l.Unlock() + + t.totalTxs++ + if result == nil { + utils.Outf("{{orange}}transaction %s expired\n", txID) + return + } + + if result.Success { + t.confirmedTxs++ } else { - // We can't error match here because we receive it over the wire. - if !strings.Contains(wsErr.Error(), ws.ErrExpired.Error()) { - utils.Outf("{{orange}}pre-execute tx failure:{{/}} %v\n", wsErr) - } + utils.Outf("{{orange}}on-chain tx failure %s:{{/}} %s %t\n", txID, string(result.Error), result.Success) } - t.totalTxs++ - t.l.Unlock() } func (t *tracker) logState(ctx context.Context, cli *jsonrpc.JSONRPCClient) { diff --git a/vm/vm_test.go b/vm/vm_test.go index 44b3c6c777..4f368732e3 100644 --- a/vm/vm_test.go +++ b/vm/vm_test.go @@ -615,9 +615,8 @@ func TestWebsocketAPI(t *testing.T) { wsBlk, wsResults, wsUnitPrices, err := client.ListenBlock(ctx, network.VMs[0].VM) r.NoError(err) - txID, txErr, res, unpackErr := client.ListenTx(ctx) + txID, res, unpackErr := client.ListenTx(ctx) r.NoError(unpackErr) - r.NoError(txErr) r.Equal(tx.GetID(), txID) r.True(res.Success)