diff --git a/cmd/beekeeper/cmd/cluster.go b/cmd/beekeeper/cmd/cluster.go index c6f24a08f..e1236b7ba 100644 --- a/cmd/beekeeper/cmd/cluster.go +++ b/cmd/beekeeper/cmd/cluster.go @@ -44,7 +44,7 @@ func (c *command) deleteCluster(ctx context.Context, clusterName string, cfg *co cluster.AddNodeGroup(ngName, ngConfig.Export()) // delete nodes from the node group - g, err := cluster.NodeGroup(ngName) + ng, err := cluster.NodeGroup(ngName) if err != nil { return fmt.Errorf("get node group: %w", err) } @@ -54,7 +54,7 @@ func (c *command) deleteCluster(ctx context.Context, clusterName string, cfg *co if len(v.Nodes[i].Name) > 0 { nName = v.Nodes[i].Name } - if err := g.DeleteNode(ctx, nName); err != nil { + if err := ng.DeleteNode(ctx, nName); err != nil { return fmt.Errorf("deleting node %s from the node group %s: %w", nName, ngName, err) } @@ -201,6 +201,7 @@ func initializeCluster(clusterConfig config.Cluster, c *command) orchestration.C clusterOpts := clusterConfig.Export() clusterOpts.SwapClient = c.swapClient clusterOpts.K8SClient = c.k8sClient + clusterOpts.HTTPClient = c.httpClient return orchestrationK8S.NewCluster(clusterConfig.GetName(), clusterOpts, c.log) } @@ -313,7 +314,7 @@ func setupOrAddNode(ctx context.Context, beeOpt orchestration.BeeClientOption, ) { if startCluster { - ethAddress, err := ng.SetupNode(ctx, nodeName, inCluster, nodeOpts) + ethAddress, err := ng.DeployNode(ctx, nodeName, inCluster, nodeOpts) ch <- nodeResult{ ethAddress: ethAddress, err: err, @@ -326,17 +327,11 @@ func setupOrAddNode(ctx context.Context, } func setupNodeOptions(node config.ClusterNode, bConfig *orchestration.Config) orchestration.NodeOptions { - nOptions := orchestration.NodeOptions{ - Config: bConfig, - } - - if len(node.LibP2PKey) > 0 { - nOptions.LibP2PKey = node.LibP2PKey - } - if len(node.SwarmKey) > 0 { - nOptions.SwarmKey = orchestration.EncryptedKey(node.SwarmKey) + return orchestration.NodeOptions{ + Config: bConfig, + LibP2PKey: node.LibP2PKey, + SwarmKey: orchestration.EncryptedKey(node.SwarmKey), } - return nOptions } func fund( diff --git a/cmd/beekeeper/cmd/cmd.go b/cmd/beekeeper/cmd/cmd.go index 501f2dd95..8807838e1 100644 --- a/cmd/beekeeper/cmd/cmd.go +++ b/cmd/beekeeper/cmd/cmd.go @@ -5,12 +5,15 @@ import ( "errors" "fmt" "io" + "net/http" "net/url" "os" "path/filepath" "strings" + "time" "github.com/ethersphere/beekeeper/pkg/config" + "github.com/ethersphere/beekeeper/pkg/httpx" "github.com/ethersphere/beekeeper/pkg/k8s" "github.com/ethersphere/beekeeper/pkg/logging" "github.com/ethersphere/beekeeper/pkg/scheduler" @@ -18,7 +21,7 @@ import ( "github.com/go-git/go-billy/v5/memfs" "github.com/go-git/go-git/v5" "github.com/go-git/go-git/v5/plumbing" - "github.com/go-git/go-git/v5/plumbing/transport/http" + httptransport "github.com/go-git/go-git/v5/plumbing/transport/http" "github.com/go-git/go-git/v5/storage/memory" "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -61,7 +64,8 @@ type command struct { globalConfigFile string homeDir string config *config.Config // beekeeper clusters configuration (config dir) - k8sClient *k8s.Client // kubernetes client + httpClient *http.Client + k8sClient *k8s.Client // kubernetes client swapClient swap.Client log logging.Logger } @@ -79,6 +83,12 @@ func newCommand(opts ...option) (c *command, err error) { return c.initConfig(cmd.Flags().Changed(optionNameClusterName)) }, }, + httpClient: &http.Client{ + Transport: &httpx.HeaderRoundTripper{ + Next: http.DefaultTransport, + }, + Timeout: 30 * time.Second, + }, } for _, o := range opts { @@ -248,7 +258,7 @@ func (c *command) initLogger() error { verbosity := c.globalConfig.GetString(optionNameLogVerbosity) lokiEndpoint := c.globalConfig.GetString(optionNameLokiEndpoint) - log, err := newLogger(c.root, verbosity, lokiEndpoint) + log, err := newLogger(c.root, verbosity, lokiEndpoint, c.httpClient) if err != nil { return fmt.Errorf("new logger: %w", err) } @@ -263,7 +273,7 @@ func (c *command) loadConfigDirectory() error { // read configuration from git repo fs := memfs.New() if _, err := git.Clone(memory.NewStorage(), fs, &git.CloneOptions{ - Auth: &http.BasicAuth{ + Auth: &httptransport.BasicAuth{ Username: c.globalConfig.GetString(optionNameConfigGitUsername), Password: c.globalConfig.GetString(optionNameConfigGitPassword), }, @@ -429,6 +439,7 @@ func (c *command) setSwapClient() (err error) { c.swapClient = swap.NewGethClient(gethUrl, &swap.GethClientOptions{ BzzTokenAddress: c.globalConfig.GetString("bzz-token-address"), EthAccount: c.globalConfig.GetString("eth-account"), + HTTPClient: c.httpClient, }, c.log) } else { c.swapClient = &swap.NotSet{} @@ -437,10 +448,10 @@ func (c *command) setSwapClient() (err error) { return } -func newLogger(cmd *cobra.Command, verbosity, lokiEndpoint string) (logging.Logger, error) { +func newLogger(cmd *cobra.Command, verbosity, lokiEndpoint string, httpClient *http.Client) (logging.Logger, error) { var logger logging.Logger opts := []logging.LoggerOption{ - logging.WithLokiOption(lokiEndpoint), + logging.WithLokiOption(lokiEndpoint, httpClient), logging.WithMetricsOption(), } diff --git a/config/config.yaml b/config/config.yaml index 4e98b1e3d..0b6c91840 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -194,6 +194,7 @@ checks: options: cache-size: 10 reserve-size: 16 + postage-label: gc-check timeout: 5m type: gc kademlia: @@ -236,6 +237,7 @@ checks: type: postage timeout: 5m options: + postage-label: postage-check postage-amount: 1000 postage-depth: 17 postage-topup-amount: 100 diff --git a/config/local.yaml b/config/local.yaml index 84aa8f5a8..7b01a2592 100644 --- a/config/local.yaml +++ b/config/local.yaml @@ -196,6 +196,7 @@ checks: options: cache-size: 10 reserve-size: 16 + postage-label: gc-check timeout: 5m type: gc ci-manifest: @@ -311,6 +312,7 @@ checks: type: postage timeout: 5m options: + postage-label: postage-check postage-amount: 1000 postage-depth: 17 postage-topup-amount: 100 diff --git a/config/public-testnet.yaml b/config/public-testnet.yaml index 57c101467..d11414885 100644 --- a/config/public-testnet.yaml +++ b/config/public-testnet.yaml @@ -123,6 +123,7 @@ checks: type: postage timeout: 30m options: + postage-label: postage-check postage-amount: 140000000 postage-depth: 17 postage-topup-amount: 100 diff --git a/pkg/bee/api/api.go b/pkg/bee/api/api.go index c8b7dae6c..9797c10c1 100644 --- a/pkg/bee/api/api.go +++ b/pkg/bee/api/api.go @@ -11,9 +11,6 @@ import ( "net/url" "strconv" "strings" - - "github.com/ethersphere/bee/v2/pkg/swarm" - "github.com/ethersphere/beekeeper" ) const ( @@ -38,11 +35,10 @@ const ( swarmErrorDocumentHeader = "Swarm-Error-Document" ) -var userAgent = "beekeeper/" + beekeeper.Version - // Client manages communication with the Bee API. type Client struct { httpClient *http.Client // HTTP client must handle authentication implicitly. + apiURL *url.URL // Base URL for API requests. service service // Reuse a single struct instead of allocating one for each service on the heap. // Services that API provides. @@ -70,22 +66,24 @@ type ClientOptions struct { } // NewClient constructs a new Client. -func NewClient(baseURL *url.URL, o *ClientOptions) (c *Client) { - if o == nil { - o = new(ClientOptions) +func NewClient(apiURL *url.URL, httpClient *http.Client) (*Client, error) { + if httpClient == nil { + httpClient = &http.Client{} } - if o.HTTPClient == nil { - o.HTTPClient = new(http.Client) + if apiURL == nil { + return nil, errors.New("baseURL is required") } - c = newClient(httpClientWithTransport(baseURL, o.HTTPClient)) - return + return newClient(apiURL, httpClient), nil } // newClient constructs a new *Client with the provided http Client, which // should handle authentication implicitly, and sets all API services. -func newClient(httpClient *http.Client) (c *Client) { - c = &Client{httpClient: httpClient} +func newClient(apiURL *url.URL, httpClient *http.Client) (c *Client) { + c = &Client{ + httpClient: httpClient, + apiURL: apiURL, + } c.service.client = c c.Act = (*ActService)(&c.service) @@ -108,32 +106,6 @@ func newClient(httpClient *http.Client) (c *Client) { return c } -func httpClientWithTransport(baseURL *url.URL, c *http.Client) *http.Client { - if c == nil { - c = new(http.Client) - } - - transport := c.Transport - if transport == nil { - transport = http.DefaultTransport - } - - if !strings.HasSuffix(baseURL.Path, "/") { - baseURL.Path += "/" - } - - c.Transport = roundTripperFunc(func(r *http.Request) (resp *http.Response, err error) { - r.Header.Set("User-Agent", userAgent) - u, err := baseURL.Parse(r.URL.String()) - if err != nil { - return nil, err - } - r.URL = u - return transport.RoundTrip(r) - }) - return c -} - // requestJSON handles the HTTP request response cycle. It JSON encodes the request // body, creates an HTTP request with provided method on a path with required // headers and decodes request body if the v argument is not nil and content type is @@ -150,9 +122,22 @@ func (c *Client) requestJSON(ctx context.Context, method, path string, body, v i return c.request(ctx, method, path, bodyBuffer, v) } +func (c *Client) getFullURL(path string) (string, error) { + rel, err := url.Parse(path) + if err != nil { + return "", fmt.Errorf("failed to parse path: %w", err) + } + return c.apiURL.ResolveReference(rel).String(), nil +} + // request handles the HTTP request response cycle. func (c *Client) request(ctx context.Context, method, path string, body io.Reader, v interface{}) (err error) { - req, err := http.NewRequest(method, path, body) + fullURL, err := c.getFullURL(path) + if err != nil { + return err + } + + req, err := http.NewRequest(method, fullURL, body) if err != nil { return err } @@ -196,7 +181,12 @@ func (c *Client) requestData(ctx context.Context, method, path string, body io.R // requestDataGetHeader handles the HTTP request response cycle and returns the response body and header. func (c *Client) requestDataGetHeader(ctx context.Context, method, path string, body io.Reader, opts *DownloadOptions) (resp io.ReadCloser, h http.Header, err error) { - req, err := http.NewRequest(method, path, body) + fullURL, err := c.getFullURL(path) + if err != nil { + return nil, nil, err + } + + req, err := http.NewRequest(method, fullURL, body) if err != nil { return nil, nil, err } @@ -245,7 +235,12 @@ func (c *Client) requestDataGetHeader(ctx context.Context, method, path string, // requestWithHeader handles the HTTP request response cycle. func (c *Client) requestWithHeader(ctx context.Context, method, path string, header http.Header, body io.Reader, v interface{}, headerParser ...func(http.Header)) (err error) { - req, err := http.NewRequest(method, path, body) + fullURL, err := c.getFullURL(path) + if err != nil { + return err + } + + req, err := http.NewRequest(method, fullURL, body) if err != nil { return err } @@ -324,40 +319,3 @@ func responseErrorHandler(r *http.Response) (err error) { type service struct { client *Client } - -// Bool is a helper routine that allocates a new bool value to store v and -// returns a pointer to it. -func Bool(v bool) (p *bool) { return &v } - -// roundTripperFunc type is an adapter to allow the use of ordinary functions as -// http.RoundTripper interfaces. If f is a function with the appropriate -// signature, roundTripperFunc(f) is a http.RoundTripper that calls f. -type roundTripperFunc func(*http.Request) (*http.Response, error) - -// RoundTrip calls f(r). -func (f roundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) { - return f(r) -} - -type UploadOptions struct { - Act bool - Pin bool - Tag uint64 - BatchID string - Direct bool - ActHistoryAddress swarm.Address - - // Dirs - IndexDocument string - ErrorDocument string -} - -type DownloadOptions struct { - Act *bool - ActHistoryAddress *swarm.Address - ActPublicKey *swarm.Address - ActTimestamp *uint64 - Cache *bool - RedundancyFallbackMode *bool - OnlyRootChunk *bool -} diff --git a/pkg/bee/api/options.go b/pkg/bee/api/options.go new file mode 100644 index 000000000..17331ec4a --- /dev/null +++ b/pkg/bee/api/options.go @@ -0,0 +1,26 @@ +package api + +import "github.com/ethersphere/bee/v2/pkg/swarm" + +type UploadOptions struct { + Act bool + Pin bool + Tag uint64 + BatchID string + Direct bool + ActHistoryAddress swarm.Address + + // Dirs + IndexDocument string + ErrorDocument string +} + +type DownloadOptions struct { + Act *bool + ActHistoryAddress *swarm.Address + ActPublicKey *swarm.Address + ActTimestamp *uint64 + Cache *bool + RedundancyFallbackMode *bool + OnlyRootChunk *bool +} diff --git a/pkg/bee/client.go b/pkg/bee/client.go index 9b69e84a4..8974182d3 100644 --- a/pkg/bee/client.go +++ b/pkg/bee/client.go @@ -3,7 +3,6 @@ package bee import ( "bytes" "context" - "crypto/tls" "encoding/json" "errors" "fmt" @@ -22,8 +21,6 @@ import ( "github.com/ethersphere/beekeeper/pkg/swap" ) -const retryCount int = 5 - // Client manages communication with the Bee node type Client struct { api *api.Client @@ -31,45 +28,43 @@ type Client struct { log logging.Logger name string apiURL *url.URL - // number of times to retry call - retry int + retryCount int } // ClientOptions holds optional parameters for the Client. type ClientOptions struct { - APIInsecureTLS bool - APIURL *url.URL - Name string - Retry int - SwapClient swap.Client + APIURL *url.URL + Name string + Retry int + SwapClient swap.Client + HTTPClient *http.Client + Logger logging.Logger } // NewClient returns Bee client -func NewClient(opts ClientOptions, log logging.Logger) (c *Client) { +func NewClient(opts ClientOptions) (c *Client, err error) { + if opts.HTTPClient == nil { + opts.HTTPClient = &http.Client{} + } + c = &Client{ - retry: retryCount, - log: log, + retryCount: 5, + log: opts.Logger, swapClient: opts.SwapClient, name: opts.Name, apiURL: opts.APIURL, } - httpClient := &http.Client{ - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: opts.APIInsecureTLS, - }, - }, + c.api, err = api.NewClient(opts.APIURL, opts.HTTPClient) + if err != nil { + return nil, fmt.Errorf("new api client: %w", err) } - if opts.APIURL != nil { - c.api = api.NewClient(opts.APIURL, &api.ClientOptions{HTTPClient: httpClient}) - } if opts.Retry > 0 { - c.retry = opts.Retry + c.retryCount = opts.Retry } - return + return c, nil } // Addresses represents node's addresses @@ -281,24 +276,23 @@ func (c *Client) HasChunks(ctx context.Context, a []swarm.Address) (has []bool, return has, count, nil } -// Overlay returns node's overlay address -func (c *Client) Overlay(ctx context.Context) (o swarm.Address, err error) { +// Overlay returns node's overlay address with retry mechanism +func (c *Client) Overlay(ctx context.Context) (swarm.Address, error) { var a api.Addresses - for r := 0; r < c.retry; r++ { - time.Sleep(2 * time.Duration(r) * time.Second) + var err error + + for r := 0; r < c.retryCount; r++ { + if r > 0 { + time.Sleep(2 * time.Duration(r) * time.Second) + } a, err = c.api.Node.Addresses(ctx) - if err != nil { - continue + if err == nil { + return a.Overlay, nil } - break } - if err != nil { - return swarm.Address{}, fmt.Errorf("get addresses: %w", err) - } - o = a.Overlay - return + return swarm.Address{}, fmt.Errorf("get addresses: %w", err) } // Peers returns addresses of node's peers @@ -471,7 +465,7 @@ func (c *Client) GetOrCreateMutableBatch(ctx context.Context, postageTTL time.Du if price > 0 { amount = (int64(postageTTL.Seconds()) / blockTime) * price } else { - c.log.Warningf("invalid chain price: %d", price) + c.log.Warningf("invalid chain price: %v", price) } batches, err := c.PostageBatches(ctx) @@ -711,8 +705,10 @@ type Bin struct { // Topology returns Kademlia topology func (c *Client) Topology(ctx context.Context) (topology Topology, err error) { var t api.Topology - for r := 0; r < c.retry; r++ { - time.Sleep(2 * time.Duration(r) * time.Second) + for r := 0; r < c.retryCount; r++ { + if r > 0 { + time.Sleep(2 * time.Duration(r) * time.Second) + } t, err = c.api.Node.Topology(ctx) if err != nil { diff --git a/pkg/check/gc/reserve.go b/pkg/check/gc/reserve.go index 77a8a63ee..ace64a82a 100644 --- a/pkg/check/gc/reserve.go +++ b/pkg/check/gc/reserve.go @@ -142,8 +142,6 @@ func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts int } rnd := random.PseudoGenerator(o.Seed) - c.logger.Info("gc: reserve check") - c.logger.Infof("Seed: %d", o.Seed) node, err := cluster.RandomNode(ctx, rnd) if err != nil { @@ -153,17 +151,15 @@ func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts int client := node.Client() - addr, err := client.Addresses(ctx) + overlay, err := client.Overlay(ctx) if err != nil { - return err + return fmt.Errorf("overlay: %w", err) } - overlay := addr.Overlay const ( - cheapBatchAmount = 1 - expensiveBatchAmount = 3 - batchDepth = uint64(8) // the depth for the batches that we buy - + cheapBatchAmount = 1 + expensiveBatchAmount = 3 + batchDepth = uint64(8) radiusAfterSecondBatch = 5 ) diff --git a/pkg/check/manifest/manifest.go b/pkg/check/manifest/manifest.go index bb787f750..260df2528 100644 --- a/pkg/check/manifest/manifest.go +++ b/pkg/check/manifest/manifest.go @@ -66,7 +66,7 @@ func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts int } rnd := random.PseudoGenerator(o.Seed) - clients, err := cluster.ShuffledFullNodeClients(ctx, rnd) + clients, err := cluster.RandomClients(ctx, rnd) if err != nil { return fmt.Errorf("node clients shuffle: %w", err) } diff --git a/pkg/httpx/transport.go b/pkg/httpx/transport.go new file mode 100644 index 000000000..708c38191 --- /dev/null +++ b/pkg/httpx/transport.go @@ -0,0 +1,18 @@ +package httpx + +import ( + "net/http" + + "github.com/ethersphere/beekeeper" +) + +var userAgent = "beekeeper/" + beekeeper.Version + +type HeaderRoundTripper struct { + Next http.RoundTripper +} + +func (hrt *HeaderRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + req.Header.Set("User-Agent", userAgent) + return hrt.Next.RoundTrip(req) +} diff --git a/pkg/k8s/config.go b/pkg/k8s/config.go index 26c260548..4fc9a5149 100644 --- a/pkg/k8s/config.go +++ b/pkg/k8s/config.go @@ -13,7 +13,7 @@ import ( // newClientConfig returns a new default ClienConfig. func newClientConfig() *ClientConfig { return &ClientConfig{ - NewForConfig: kubernetes.NewForConfig, + NewForConfig: kubernetes.NewForConfig, // TODO: use NewForConfigAndClient NewIngressRouteClientForConfig: ingressroute.NewForConfig, InClusterConfig: rest.InClusterConfig, BuildConfigFromFlags: clientcmd.BuildConfigFromFlags, diff --git a/pkg/k8s/k8s.go b/pkg/k8s/k8s.go index 05a29d386..aea6d3b1f 100644 --- a/pkg/k8s/k8s.go +++ b/pkg/k8s/k8s.go @@ -105,9 +105,9 @@ func NewClient(opts ...ClientOption) (c *Client, err error) { ct := NewCustomTransport(config, semaphore, c.logger) // Wrap the default transport with our custom transport. - config.WrapTransport = func(rt http.RoundTripper) http.RoundTripper { + config.Wrap(func(rt http.RoundTripper) http.RoundTripper { return ct.SetBaseTransport(rt) - } + }) clientset, err := c.clientConfig.NewForConfig(config) if err != nil { diff --git a/pkg/logging/logging.go b/pkg/logging/logging.go index 5e6feeab9..69be8c26b 100644 --- a/pkg/logging/logging.go +++ b/pkg/logging/logging.go @@ -8,6 +8,7 @@ package logging import ( "io" + "net/http" "github.com/sirupsen/logrus" ) @@ -64,10 +65,10 @@ func (l *logger) GetLevel() string { } // WithLokiOption sets the hook for Loki logging. -func WithLokiOption(lokiEndpoint string) LoggerOption { +func WithLokiOption(lokiEndpoint string, httpClient *http.Client) LoggerOption { return func(l *logger) { if lokiEndpoint != "" { - l.Logger.AddHook(newLoki(lokiEndpoint)) + l.Logger.AddHook(newLoki(lokiEndpoint, httpClient)) } } } diff --git a/pkg/logging/logrusloki.go b/pkg/logging/logrusloki.go index 59de2d166..01c72c6b9 100644 --- a/pkg/logging/logrusloki.go +++ b/pkg/logging/logrusloki.go @@ -19,15 +19,20 @@ type LokiHook struct { httpclient *http.Client } -func newLoki(lokiEndpoint string) LokiHook { +func newLoki(lokiEndpoint string, httpClient *http.Client) LokiHook { hostname, err := os.Hostname() if err != nil { hostname = "unknown" } + + if httpClient == nil { + httpClient = &http.Client{Timeout: 5 * time.Second} + } + return LokiHook{ hostname: hostname, lokiEndpoint: lokiEndpoint, - httpclient: &http.Client{Timeout: 5 * time.Second}, + httpclient: httpClient, } } @@ -67,7 +72,7 @@ func (l LokiHook) executeHTTPRequest(batch *loki.Batch) error { return err } - req, err := http.NewRequest("POST", l.lokiEndpoint, bytes.NewReader(data)) + req, err := http.NewRequest(http.MethodPost, l.lokiEndpoint, bytes.NewReader(data)) if err != nil { return err } diff --git a/pkg/orchestration/cluster.go b/pkg/orchestration/cluster.go index 1a99915f9..98740b8b9 100644 --- a/pkg/orchestration/cluster.go +++ b/pkg/orchestration/cluster.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math/rand" + "net/http" "net/url" "github.com/ethersphere/bee/v2/pkg/swarm" @@ -35,7 +36,7 @@ type Cluster interface { Peers(ctx context.Context, exclude ...string) (peers ClusterPeers, err error) RandomNode(ctx context.Context, r *rand.Rand) (node Node, err error) Settlements(ctx context.Context) (settlements ClusterSettlements, err error) - ShuffledFullNodeClients(ctx context.Context, r *rand.Rand) ([]*bee.Client, error) + RandomClients(ctx context.Context, r *rand.Rand) ([]*bee.Client, error) Size() (size int) Topologies(ctx context.Context) (topologies ClusterTopologies, err error) } @@ -47,11 +48,12 @@ type ClusterOptions struct { APIDomainInternal string APIInsecureTLS bool APIScheme string - K8SClient *k8s.Client - SwapClient swap.Client + DisableNamespace bool Labels map[string]string Namespace string - DisableNamespace bool + K8SClient *k8s.Client + SwapClient swap.Client + HTTPClient *http.Client } // ClusterAddresses represents addresses of all nodes in the cluster diff --git a/pkg/orchestration/k8s/cluster.go b/pkg/orchestration/k8s/cluster.go index 080d94564..ac04f89f5 100644 --- a/pkg/orchestration/k8s/cluster.go +++ b/pkg/orchestration/k8s/cluster.go @@ -2,11 +2,15 @@ package k8s import ( "context" + "crypto/tls" "fmt" "math/rand" + "net/http" + "time" "github.com/ethersphere/bee/v2/pkg/swarm" "github.com/ethersphere/beekeeper/pkg/bee" + "github.com/ethersphere/beekeeper/pkg/httpx" "github.com/ethersphere/beekeeper/pkg/logging" "github.com/ethersphere/beekeeper/pkg/orchestration" "github.com/ethersphere/beekeeper/pkg/orchestration/notset" @@ -22,35 +26,50 @@ type Cluster struct { name string opts orchestration.ClusterOptions nodeGroups map[string]orchestration.NodeGroup // set when groups are added to the cluster + httpClient *http.Client log logging.Logger } // NewCluster returns new cluster func NewCluster(name string, o orchestration.ClusterOptions, log logging.Logger) *Cluster { - var no orchestration.NodeOrchestrator + var nodeOrchestrator orchestration.NodeOrchestrator if o.K8SClient == nil { - no = ¬set.BeeClient{} + nodeOrchestrator = ¬set.BeeClient{} } else { - no = newNodeOrchestrator(o.K8SClient, log) + nodeOrchestrator = newNodeOrchestrator(o.K8SClient, log) } if o.SwapClient == nil { o.SwapClient = &swap.NotSet{} } + if o.HTTPClient == nil { + o.HTTPClient = &http.Client{} + } + return &Cluster{ name: name, - nodeOrchestrator: no, + nodeOrchestrator: nodeOrchestrator, opts: o, nodeGroups: make(map[string]orchestration.NodeGroup), log: log, + httpClient: &http.Client{ + Transport: &httpx.HeaderRoundTripper{ + Next: &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: o.APIInsecureTLS, + }, + }, + }, + Timeout: 30 * time.Second, + }, } } // AddNodeGroup adds new node group to the cluster func (c *Cluster) AddNodeGroup(name string, o orchestration.NodeGroupOptions) { - c.nodeGroups[name] = NewNodeGroup(name, c.opts, c.nodeOrchestrator, o, c.log) + c.nodeGroups[name] = NewNodeGroup(name, c.opts, c.nodeOrchestrator, o, c.httpClient, c.log) } // Addresses returns ClusterAddresses @@ -223,8 +242,8 @@ func (c *Cluster) FullNodeNames() (names []string) { return } -// ShuffledFullNodeClients returns a list of full node clients shuffled -func (c *Cluster) ShuffledFullNodeClients(ctx context.Context, r *rand.Rand) ([]*bee.Client, error) { +// RandomClients returns a shuffled list of full node clients +func (c *Cluster) RandomClients(ctx context.Context, r *rand.Rand) ([]*bee.Client, error) { var res []*bee.Client for _, node := range c.Nodes() { cfg := node.Config() diff --git a/pkg/orchestration/k8s/node.go b/pkg/orchestration/k8s/node.go index 5dbb7cb12..bcd407c3b 100644 --- a/pkg/orchestration/k8s/node.go +++ b/pkg/orchestration/k8s/node.go @@ -14,16 +14,18 @@ var _ orchestration.Node = (*Node)(nil) // Node represents Bee node type Node struct { orchestration.NodeOrchestrator - name string - opts orchestration.NodeOptions - log logging.Logger + name string + client *bee.Client + opts orchestration.NodeOptions + log logging.Logger } // NewNode returns Bee node -func NewNode(name string, opts orchestration.NodeOptions, no orchestration.NodeOrchestrator, log logging.Logger) (n *Node) { +func NewNode(name string, client *bee.Client, opts orchestration.NodeOptions, no orchestration.NodeOrchestrator, log logging.Logger) (n *Node) { return &Node{ NodeOrchestrator: no, name: name, + client: client, opts: opts, log: log, } @@ -36,7 +38,7 @@ func (n Node) Name() string { // Client returns node's name func (n Node) Client() *bee.Client { - return n.opts.Client + return n.client } // Config returns node's config diff --git a/pkg/orchestration/k8s/nodegroup.go b/pkg/orchestration/k8s/nodegroup.go index ee3f91cc2..3ff7e49a6 100644 --- a/pkg/orchestration/k8s/nodegroup.go +++ b/pkg/orchestration/k8s/nodegroup.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "net/http" "net/url" "sort" "sync" @@ -30,12 +31,13 @@ type NodeGroup struct { nodes map[string]orchestration.Node opts orchestration.NodeGroupOptions clusterOpts orchestration.ClusterOptions + httpClient *http.Client log logging.Logger lock sync.RWMutex } // NewNodeGroup returns new node group -func NewNodeGroup(name string, copts orchestration.ClusterOptions, no orchestration.NodeOrchestrator, ngopts orchestration.NodeGroupOptions, log logging.Logger) *NodeGroup { +func NewNodeGroup(name string, copts orchestration.ClusterOptions, no orchestration.NodeOrchestrator, ngopts orchestration.NodeGroupOptions, httpClient *http.Client, log logging.Logger) *NodeGroup { ngopts.Annotations = mergeMaps(ngopts.Annotations, copts.Annotations) ngopts.Labels = mergeMaps(ngopts.Labels, copts.Labels) @@ -50,28 +52,29 @@ func NewNodeGroup(name string, copts orchestration.ClusterOptions, no orchestrat } // AddNode adss new node to the node group -func (g *NodeGroup) AddNode(ctx context.Context, name string, inCluster bool, o orchestration.NodeOptions, opts ...orchestration.BeeClientOption) (err error) { - var aURL *url.URL +func (g *NodeGroup) AddNode(ctx context.Context, name string, inCluster bool, nodeOptions orchestration.NodeOptions, opts ...orchestration.BeeClientOption) (err error) { + var apiURL *url.URL - aURL, err = g.clusterOpts.ApiURL(name, inCluster) + apiURL, err = g.clusterOpts.ApiURL(name, inCluster) if err != nil { return fmt.Errorf("API URL %s: %w", name, err) } // TODO: make more granular, check every sub-option var config *orchestration.Config - if o.Config != nil { - config = o.Config + if nodeOptions.Config != nil { + config = nodeOptions.Config } else { config = g.opts.BeeConfig } beeClientOpts := bee.ClientOptions{ - Name: name, - APIURL: aURL, - APIInsecureTLS: g.clusterOpts.APIInsecureTLS, - Retry: 5, - SwapClient: g.clusterOpts.SwapClient, + Name: name, + APIURL: apiURL, + Retry: 5, + SwapClient: g.clusterOpts.SwapClient, + HTTPClient: g.httpClient, + Logger: g.log, } for _, opt := range opts { @@ -80,13 +83,15 @@ func (g *NodeGroup) AddNode(ctx context.Context, name string, inCluster bool, o } } - client := bee.NewClient(beeClientOpts, g.log) + client, err := bee.NewClient(beeClientOpts) + if err != nil { + return fmt.Errorf("bee client: %w", err) + } - n := NewNode(name, orchestration.NodeOptions{ - Client: client, + n := NewNode(name, client, orchestration.NodeOptions{ Config: config, - LibP2PKey: o.LibP2PKey, - SwarmKey: o.SwarmKey, + LibP2PKey: nodeOptions.LibP2PKey, + SwarmKey: nodeOptions.SwarmKey, }, g.nodeOrchestrator, g.log) g.addNode(n) @@ -310,7 +315,7 @@ func (g *NodeGroup) BalancesStream(ctx context.Context) (<-chan BalancesStreamMs } // CreateNode creates new node in the k8s cluster -func (g *NodeGroup) CreateNode(ctx context.Context, name string) (err error) { +func (g *NodeGroup) createNode(ctx context.Context, name string) (err error) { labels := mergeMaps(g.opts.Labels, map[string]string{ "app.kubernetes.io/instance": name, }) @@ -357,8 +362,7 @@ func (g *NodeGroup) CreateNode(ctx context.Context, name string) (err error) { // DeleteNode deletes node from the k8s cluster and removes it from the node group func (g *NodeGroup) DeleteNode(ctx context.Context, name string) (err error) { - n := NewNode(name, orchestration.NodeOptions{}, g.nodeOrchestrator, g.log) - if err := n.Delete(ctx, g.clusterOpts.Namespace); err != nil { + if err := g.nodeOrchestrator.Delete(ctx, name, g.clusterOpts.Namespace); err != nil { return err } @@ -368,7 +372,7 @@ func (g *NodeGroup) DeleteNode(ctx context.Context, name string) (err error) { } // GetEthAddress returns ethereum address of the node -func (ng *NodeGroup) GetEthAddress(ctx context.Context, name string, o orchestration.NodeOptions) (string, error) { +func (ng *NodeGroup) getEthAddress(ctx context.Context, name string, o orchestration.NodeOptions) (string, error) { var a bee.Addresses a.Ethereum, _ = o.SwarmKey.GetEthAddress() if a.Ethereum == "" { @@ -460,11 +464,6 @@ func (g *NodeGroup) HasChunkStream(ctx context.Context, a swarm.Address) (<-chan return hasChunkStream, nil } -// Name returns name of the node group -func (g *NodeGroup) Name() string { - return g.name -} - // Nodes returns map of nodes in the node group func (g *NodeGroup) Nodes() map[string]orchestration.Node { nodes := make(map[string]orchestration.Node) @@ -638,8 +637,8 @@ func (g *NodeGroup) PeersStream(ctx context.Context) (<-chan PeersStreamMsg, err return peersStream, nil } -// NodeReady returns node's readiness -func (g *NodeGroup) NodeReady(ctx context.Context, name string) (ok bool, err error) { +// nodeReady returns node's readiness +func (g *NodeGroup) nodeReady(ctx context.Context, name string) (ok bool, err error) { n, err := g.getNode(name) if err != nil { return false, err @@ -648,8 +647,8 @@ func (g *NodeGroup) NodeReady(ctx context.Context, name string) (ok bool, err er return n.Ready(ctx, g.clusterOpts.Namespace) } -// PregenerateSwarmKey for a node if needed -func (g *NodeGroup) PregenerateSwarmKey(ctx context.Context, name string) (err error) { +// pregenerateSwarmKey for a node if needed +func (g *NodeGroup) pregenerateSwarmKey(ctx context.Context, name string) (err error) { n, err := g.getNode(name) if err != nil { return err @@ -708,26 +707,26 @@ func (g *NodeGroup) RunningNodes(ctx context.Context) (running []string, err err } // SetupNode creates new node in the node group, starts it in the k8s cluster and funds it -func (g *NodeGroup) SetupNode(ctx context.Context, name string, inCluster bool, o orchestration.NodeOptions) (ethAddress string, err error) { - g.log.Infof("starting setup node: %s", name) +func (g *NodeGroup) DeployNode(ctx context.Context, name string, inCluster bool, nodeOptions orchestration.NodeOptions) (ethAddress string, err error) { + g.log.Infof("deploying node %s", name) - if err := g.AddNode(ctx, name, inCluster, o); err != nil { + if err := g.AddNode(ctx, name, inCluster, nodeOptions); err != nil { return "", fmt.Errorf("add node %s: %w", name, err) } - if err := g.PregenerateSwarmKey(ctx, name); err != nil { + if err := g.pregenerateSwarmKey(ctx, name); err != nil { return "", fmt.Errorf("pregenerate Swarm key for node %s: %w", name, err) } - if err := g.CreateNode(ctx, name); err != nil { + if err := g.createNode(ctx, name); err != nil { return "", fmt.Errorf("create node %s in k8s: %w", name, err) } - if err := g.StartNode(ctx, name); err != nil { + if err := g.startNode(ctx, name); err != nil { return "", fmt.Errorf("start node %s in k8s: %w", name, err) } - ethAddress, err = g.GetEthAddress(ctx, name, o) + ethAddress, err = g.getEthAddress(ctx, name, nodeOptions) if err != nil { return "", fmt.Errorf("get eth address for funding: %w", err) } @@ -819,7 +818,7 @@ func (g *NodeGroup) Size() int { } // StartNode start node by scaling its statefulset to 1 -func (g *NodeGroup) StartNode(ctx context.Context, name string) (err error) { +func (g *NodeGroup) startNode(ctx context.Context, name string) (err error) { n, err := g.getNode(name) if err != nil { return err @@ -832,7 +831,7 @@ func (g *NodeGroup) StartNode(ctx context.Context, name string) (err error) { g.log.Infof("wait for %s to become ready", name) for { - ok, err := g.NodeReady(ctx, name) + ok, err := g.nodeReady(ctx, name) if err != nil { return fmt.Errorf("node %s readiness: %w", name, err) } @@ -844,32 +843,6 @@ func (g *NodeGroup) StartNode(ctx context.Context, name string) (err error) { } } -// StopNode stops node by scaling down its statefulset to 0 -func (g *NodeGroup) StopNode(ctx context.Context, name string) (err error) { - n, err := g.getNode(name) - if err != nil { - return err - } - - if err := n.Stop(ctx, g.clusterOpts.Namespace); err != nil { - return err - } - - g.log.Infof("wait for %s to stop", name) - - for { - ok, err := g.NodeReady(ctx, name) - if err != nil { - return fmt.Errorf("node %s readiness: %w", name, err) - } - - if !ok { - g.log.Infof("%s is stopped", name) - return nil - } - } -} - // StoppedNodes returns list of stopped nodes // TODO: filter by labels func (g *NodeGroup) StoppedNodes(ctx context.Context) (stopped []string, err error) { diff --git a/pkg/orchestration/k8s/node_orchestrator.go b/pkg/orchestration/k8s/orchestrator.go similarity index 99% rename from pkg/orchestration/k8s/node_orchestrator.go rename to pkg/orchestration/k8s/orchestrator.go index 0d96a4c3e..42b591b91 100644 --- a/pkg/orchestration/k8s/node_orchestrator.go +++ b/pkg/orchestration/k8s/orchestrator.go @@ -40,7 +40,7 @@ func (n *nodeOrchestrator) RunningNodes(ctx context.Context, namespace string) ( if err != nil { return nil, fmt.Errorf("running statefulsets in namespace %s: %w", namespace, err) } - return + return running, nil } // StoppedNodes implements orchestration.NodeOrchestrator. @@ -49,7 +49,7 @@ func (n *nodeOrchestrator) StoppedNodes(ctx context.Context, namespace string) ( if err != nil { return nil, fmt.Errorf("stopped statefulsets in namespace %s: %w", namespace, err) } - return + return stopped, nil } // Create @@ -309,7 +309,7 @@ func (n *nodeOrchestrator) Create(ctx context.Context, o orchestration.CreateOpt } n.log.Infof("statefulset %s is set in namespace %s", sSet, o.Namespace) - return + return nil } func (n *nodeOrchestrator) Delete(ctx context.Context, name string, namespace string) (err error) { @@ -373,7 +373,8 @@ func (n *nodeOrchestrator) Delete(ctx context.Context, name string, namespace st n.log.Infof("configmap %s is deleted in namespace %s", configCM, namespace) n.log.Infof("node %s is deleted in namespace %s", name, namespace) - return + + return nil } func (n *nodeOrchestrator) Ready(ctx context.Context, name string, namespace string) (ready bool, err error) { @@ -393,7 +394,8 @@ func (n *nodeOrchestrator) Start(ctx context.Context, name string, namespace str } n.log.Infof("node %s is started in namespace %s", name, namespace) - return + + return nil } func (n *nodeOrchestrator) Stop(ctx context.Context, name string, namespace string) (err error) { @@ -403,5 +405,6 @@ func (n *nodeOrchestrator) Stop(ctx context.Context, name string, namespace stri } n.log.Infof("node %s is stopped in namespace %s", name, namespace) - return + + return nil } diff --git a/pkg/orchestration/node.go b/pkg/orchestration/node.go index 893dc6a9d..94c18385f 100644 --- a/pkg/orchestration/node.go +++ b/pkg/orchestration/node.go @@ -11,7 +11,6 @@ import ( "github.com/ethersphere/beekeeper/pkg/bee" ) -// ErrNotSet represents error when orchestration client is not set var ErrNotSet = errors.New("orchestration client not set") type Node interface { @@ -69,7 +68,6 @@ func (ek EncryptedKey) GetEthAddress() (string, error) { // NodeOptions holds optional parameters for the Node. type NodeOptions struct { - Client *bee.Client Config *Config LibP2PKey string SwarmKey EncryptedKey diff --git a/pkg/orchestration/nodegroup.go b/pkg/orchestration/nodegroup.go index 1b312395d..22388a8d4 100644 --- a/pkg/orchestration/nodegroup.go +++ b/pkg/orchestration/nodegroup.go @@ -14,13 +14,10 @@ type NodeGroup interface { AddNode(ctx context.Context, name string, inCluster bool, o NodeOptions, opts ...BeeClientOption) (err error) Addresses(ctx context.Context) (addrs NodeGroupAddresses, err error) Balances(ctx context.Context) (balances NodeGroupBalances, err error) - CreateNode(ctx context.Context, name string) (err error) DeleteNode(ctx context.Context, name string) (err error) - GetEthAddress(ctx context.Context, name string, o NodeOptions) (ethAddress string, err error) + DeployNode(ctx context.Context, name string, inCluster bool, o NodeOptions) (ethAddress string, err error) GroupReplicationFactor(ctx context.Context, a swarm.Address) (grf int, err error) - Name() string NodeClient(name string) (*bee.Client, error) - NodeReady(ctx context.Context, name string) (ok bool, err error) Nodes() map[string]Node NodesClients(ctx context.Context) (map[string]*bee.Client, error) NodesSorted() (l []string) @@ -28,10 +25,7 @@ type NodeGroup interface { Peers(ctx context.Context) (peers NodeGroupPeers, err error) RunningNodes(ctx context.Context) (running []string, err error) Settlements(ctx context.Context) (settlements NodeGroupSettlements, err error) - SetupNode(ctx context.Context, name string, inCluster bool, o NodeOptions) (ethAddress string, err error) Size() int - StartNode(ctx context.Context, name string) (err error) - StopNode(ctx context.Context, name string) (err error) StoppedNodes(ctx context.Context) (stopped []string, err error) Topologies(ctx context.Context) (topologies NodeGroupTopologies, err error) } diff --git a/pkg/stamper/node.go b/pkg/stamper/node.go index 209c96b57..ebf30799e 100644 --- a/pkg/stamper/node.go +++ b/pkg/stamper/node.go @@ -169,7 +169,7 @@ func (n *node) getPrice(ctx context.Context) (int64, error) { price := chainState.CurrentPrice.Int64() if price <= 0 { - return 0, fmt.Errorf("node %s: invalid chain price: %d", n.name, price) + return 0, fmt.Errorf("node %s: invalid chain price: %v", n.name, price) } return price, nil diff --git a/pkg/stamper/stamper.go b/pkg/stamper/stamper.go index 7263c4915..a1e3e76a3 100644 --- a/pkg/stamper/stamper.go +++ b/pkg/stamper/stamper.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "net/http" "net/url" "time" @@ -37,6 +38,7 @@ func WithPostageLabels(postageLabels []string) Option { type ClientConfig struct { Log logging.Logger Namespace string + HTTPClient *http.Client K8sClient *k8s.Client SwapClient swap.Client BeeClients map[string]*bee.Client @@ -49,6 +51,7 @@ type Client struct { namespace string k8sClient *k8s.Client swapClient swap.Client + httpClient *http.Client beeClients map[string]*bee.Client labelSelector string inCluster bool @@ -67,6 +70,10 @@ func New(cfg *ClientConfig) *Client { cfg.SwapClient = &swap.NotSet{} } + if cfg.HTTPClient == nil { + cfg.HTTPClient = &http.Client{} + } + return &Client{ log: cfg.Log, namespace: cfg.Namespace, @@ -75,6 +82,7 @@ func New(cfg *ClientConfig) *Client { beeClients: cfg.BeeClients, labelSelector: cfg.LabelSelector, inCluster: cfg.InCluster, + httpClient: cfg.HTTPClient, } } @@ -238,7 +246,10 @@ func (sc *Client) getServiceNodes(ctx context.Context) ([]node, error) { return nil, fmt.Errorf("extract base URL: %w", err) } - apiClient := api.NewClient(parsedURL, nil) + apiClient, err := api.NewClient(parsedURL, sc.httpClient) + if err != nil { + return nil, fmt.Errorf("create api client: %w", err) + } nodes[i] = *newNodeInfo(apiClient, node.Name, sc.log) } @@ -260,12 +271,15 @@ func (sc *Client) getIngressNodes(ctx context.Context) ([]node, error) { allNodes := append(ingressNodes, ingressRouteNodes...) nodes := make([]node, len(allNodes)) for i, node := range allNodes { - parsedURL, err := url.Parse(fmt.Sprintf("http://%s", node.Host)) + apiURL, err := url.Parse(fmt.Sprintf("http://%s", node.Host)) if err != nil { return nil, fmt.Errorf("extract base URL: %w", err) } - apiClient := api.NewClient(parsedURL, nil) + apiClient, err := api.NewClient(apiURL, sc.httpClient) + if err != nil { + return nil, fmt.Errorf("create api client: %w", err) + } nodes[i] = *newNodeInfo(apiClient, node.Name, sc.log) } diff --git a/pkg/swap/block.go b/pkg/swap/block.go index 001e214ea..7f7d3ce56 100644 --- a/pkg/swap/block.go +++ b/pkg/swap/block.go @@ -102,7 +102,7 @@ func (g *GethClient) fetchLatestBlockNumber(ctx context.Context) (int64, error) ID string `json:"id"` }) - if err := requestJSON(ctx, g.httpClient, http.MethodPost, "/", req, &resp); err != nil { + if err := g.requestJSON(ctx, g.httpClient, http.MethodPost, "/", req, &resp); err != nil { return 0, fmt.Errorf("request json: %w", err) } @@ -138,7 +138,7 @@ func (g *GethClient) fetchBlockTimestamp(ctx context.Context, blockNumber int64) } `json:"result"` }) - if err := requestJSON(ctx, g.httpClient, http.MethodPost, "/", req, &resp); err != nil { + if err := g.requestJSON(ctx, g.httpClient, http.MethodPost, "/", req, &resp); err != nil { return 0, fmt.Errorf("request json: %w", err) } diff --git a/pkg/swap/geth.go b/pkg/swap/geth.go index f262975fe..f763bc279 100644 --- a/pkg/swap/geth.go +++ b/pkg/swap/geth.go @@ -19,7 +19,8 @@ var _ Client = (*GethClient)(nil) type GethClient struct { bzzTokenAddress string ethAccount string - httpClient *http.Client // HTTP client must handle authentication implicitly + httpClient *http.Client + baseURL *url.URL logger logging.Logger } @@ -31,7 +32,7 @@ type GethClientOptions struct { } // NewClient constructs a new Client. -func NewGethClient(baseURL *url.URL, o *GethClientOptions, logger logging.Logger) (gc *GethClient) { +func NewGethClient(baseURL *url.URL, o *GethClientOptions, logger logging.Logger) *GethClient { if o == nil { o = new(GethClientOptions) } @@ -48,14 +49,13 @@ func NewGethClient(baseURL *url.URL, o *GethClientOptions, logger logging.Logger o.EthAccount = EthAccount } - gc = &GethClient{ + return &GethClient{ bzzTokenAddress: o.BzzTokenAddress, ethAccount: o.EthAccount, - httpClient: httpClientWithTransport(baseURL, o.HTTPClient), + httpClient: o.HTTPClient, logger: logger, + baseURL: baseURL, } - - return } // ethRequest represents common eth request @@ -106,7 +106,7 @@ func (g *GethClient) SendETH(ctx context.Context, to string, amount float64) (tx Result string `json:"result"` }) - if err = requestJSON(ctx, g.httpClient, http.MethodPost, "/", req, &resp); err != nil { + if err = g.requestJSON(ctx, g.httpClient, http.MethodPost, "/", req, &resp); err != nil { return "", err } @@ -143,7 +143,7 @@ func (g *GethClient) SendBZZ(ctx context.Context, to string, amount float64) (tx Result string `json:"result"` }) - if err = requestJSON(ctx, g.httpClient, http.MethodPost, "/", req, &resp); err != nil { + if err = g.requestJSON(ctx, g.httpClient, http.MethodPost, "/", req, &resp); err != nil { return "", err } @@ -180,7 +180,7 @@ func (g *GethClient) SendGBZZ(ctx context.Context, to string, amount float64) (t Result string `json:"result"` }) - if err = requestJSON(ctx, g.httpClient, http.MethodPost, "/", req, &resp); err != nil { + if err = g.requestJSON(ctx, g.httpClient, http.MethodPost, "/", req, &resp); err != nil { return "", err } @@ -216,7 +216,7 @@ func (g *GethClient) AttestOverlayEthAddress(ctx context.Context, ethAddr string Result string `json:"result"` }) - if err = requestJSON(ctx, g.httpClient, http.MethodPost, "/", req, &resp); err != nil { + if err = g.requestJSON(ctx, g.httpClient, http.MethodPost, "/", req, &resp); err != nil { return "", err } @@ -238,7 +238,7 @@ func (g *GethClient) ethAccounts(ctx context.Context) (a []string, err error) { Result []string `json:"result"` }) - if err := requestJSON(ctx, g.httpClient, http.MethodGet, "/", req, &resp); err != nil { + if err := g.requestJSON(ctx, g.httpClient, http.MethodGet, "/", req, &resp); err != nil { return nil, err } diff --git a/pkg/swap/http.go b/pkg/swap/http.go index 50a760910..27f2c7f16 100644 --- a/pkg/swap/http.go +++ b/pkg/swap/http.go @@ -5,23 +5,20 @@ import ( "context" "encoding/json" "errors" + "fmt" "io" "net/http" "net/url" "strings" - - "github.com/ethersphere/beekeeper" ) const contentType = "application/json; charset=utf-8" -var userAgent = "beekeeper/" + beekeeper.Version - // requestJSON handles the HTTP request response cycle. It JSON encodes the request // body, creates an HTTP request with provided method on a path with required // headers and decodes request body if the v argument is not nil and content type is // application/json. -func requestJSON(ctx context.Context, httpClient *http.Client, method, path string, body, v interface{}) (err error) { +func (c *GethClient) requestJSON(ctx context.Context, httpClient *http.Client, method, path string, body, v interface{}) (err error) { var bodyBuffer io.ReadWriter if body != nil { bodyBuffer = new(bytes.Buffer) @@ -30,12 +27,25 @@ func requestJSON(ctx context.Context, httpClient *http.Client, method, path stri } } - return request(ctx, httpClient, method, path, bodyBuffer, v) + return c.request(ctx, httpClient, method, path, bodyBuffer, v) +} + +func (c *GethClient) getFullURL(path string) (string, error) { + rel, err := url.Parse(path) + if err != nil { + return "", fmt.Errorf("failed to parse path: %w", err) + } + return c.baseURL.ResolveReference(rel).String(), nil } // request handles the HTTP request response cycle. -func request(ctx context.Context, httpClient *http.Client, method, path string, body io.Reader, v interface{}) (err error) { - req, err := http.NewRequest(method, path, body) +func (c *GethClient) request(ctx context.Context, httpClient *http.Client, method, path string, body io.Reader, v interface{}) (err error) { + fullURL, err := c.getFullURL(path) + if err != nil { + return err + } + + req, err := http.NewRequest(method, fullURL, body) if err != nil { return err } @@ -62,28 +72,6 @@ func request(ctx context.Context, httpClient *http.Client, method, path string, return nil } -func httpClientWithTransport(baseURL *url.URL, c *http.Client) *http.Client { - if c == nil { - c = new(http.Client) - } - - transport := c.Transport - if transport == nil { - transport = http.DefaultTransport - } - - if !strings.HasSuffix(baseURL.Path, "/") { - baseURL.Path += "/" - } - - c.Transport = roundTripperFunc(func(r *http.Request) (resp *http.Response, err error) { - r.Header.Set("User-Agent", userAgent) - r.URL = baseURL - return transport.RoundTrip(r) - }) - return c -} - // encodeJSON writes a JSON-encoded v object to the provided writer with // SetEscapeHTML set to false. func encodeJSON(w io.Writer, v interface{}) (err error) { @@ -152,17 +140,3 @@ func decodeBadRequest(r *http.Response) (err error) { } return NewBadRequestError(e.Errors...) } - -// Bool is a helper routine that allocates a new bool value to store v and -// returns a pointer to it. -func Bool(v bool) (p *bool) { return &v } - -// roundTripperFunc type is an adapter to allow the use of ordinary functions as -// http.RoundTripper interfaces. If f is a function with the appropriate -// signature, roundTripperFunc(f) is a http.RoundTripper that calls f. -type roundTripperFunc func(*http.Request) (*http.Response, error) - -// RoundTrip calls f(r). -func (f roundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) { - return f(r) -}