Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use single http client instance #459

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 8 additions & 13 deletions cmd/beekeeper/cmd/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (c *command) deleteCluster(ctx context.Context, clusterName string, cfg *co
cluster.AddNodeGroup(ngName, ngConfig.Export())

// delete nodes from the node group
g, err := cluster.NodeGroup(ngName)
ng, err := cluster.NodeGroup(ngName)
if err != nil {
return fmt.Errorf("get node group: %w", err)
}
Expand All @@ -54,7 +54,7 @@ func (c *command) deleteCluster(ctx context.Context, clusterName string, cfg *co
if len(v.Nodes[i].Name) > 0 {
nName = v.Nodes[i].Name
}
if err := g.DeleteNode(ctx, nName); err != nil {
if err := ng.DeleteNode(ctx, nName); err != nil {
return fmt.Errorf("deleting node %s from the node group %s: %w", nName, ngName, err)
}

Expand Down Expand Up @@ -201,6 +201,7 @@ func initializeCluster(clusterConfig config.Cluster, c *command) orchestration.C
clusterOpts := clusterConfig.Export()
clusterOpts.SwapClient = c.swapClient
clusterOpts.K8SClient = c.k8sClient
clusterOpts.HTTPClient = c.httpClient
return orchestrationK8S.NewCluster(clusterConfig.GetName(), clusterOpts, c.log)
}

Expand Down Expand Up @@ -313,7 +314,7 @@ func setupOrAddNode(ctx context.Context,
beeOpt orchestration.BeeClientOption,
) {
if startCluster {
ethAddress, err := ng.SetupNode(ctx, nodeName, inCluster, nodeOpts)
ethAddress, err := ng.DeployNode(ctx, nodeName, inCluster, nodeOpts)
ch <- nodeResult{
ethAddress: ethAddress,
err: err,
Expand All @@ -326,17 +327,11 @@ func setupOrAddNode(ctx context.Context,
}

func setupNodeOptions(node config.ClusterNode, bConfig *orchestration.Config) orchestration.NodeOptions {
nOptions := orchestration.NodeOptions{
Config: bConfig,
}

if len(node.LibP2PKey) > 0 {
nOptions.LibP2PKey = node.LibP2PKey
}
if len(node.SwarmKey) > 0 {
nOptions.SwarmKey = orchestration.EncryptedKey(node.SwarmKey)
return orchestration.NodeOptions{
Config: bConfig,
LibP2PKey: node.LibP2PKey,
SwarmKey: orchestration.EncryptedKey(node.SwarmKey),
}
return nOptions
}

func fund(
Expand Down
23 changes: 17 additions & 6 deletions cmd/beekeeper/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,23 @@ import (
"errors"
"fmt"
"io"
"net/http"
"net/url"
"os"
"path/filepath"
"strings"
"time"

"github.com/ethersphere/beekeeper/pkg/config"
"github.com/ethersphere/beekeeper/pkg/httpx"
"github.com/ethersphere/beekeeper/pkg/k8s"
"github.com/ethersphere/beekeeper/pkg/logging"
"github.com/ethersphere/beekeeper/pkg/scheduler"
"github.com/ethersphere/beekeeper/pkg/swap"
"github.com/go-git/go-billy/v5/memfs"
"github.com/go-git/go-git/v5"
"github.com/go-git/go-git/v5/plumbing"
"github.com/go-git/go-git/v5/plumbing/transport/http"
httptransport "github.com/go-git/go-git/v5/plumbing/transport/http"
"github.com/go-git/go-git/v5/storage/memory"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -61,7 +64,8 @@ type command struct {
globalConfigFile string
homeDir string
config *config.Config // beekeeper clusters configuration (config dir)
k8sClient *k8s.Client // kubernetes client
httpClient *http.Client
k8sClient *k8s.Client // kubernetes client
swapClient swap.Client
log logging.Logger
}
Expand All @@ -79,6 +83,12 @@ func newCommand(opts ...option) (c *command, err error) {
return c.initConfig(cmd.Flags().Changed(optionNameClusterName))
},
},
httpClient: &http.Client{
Transport: &httpx.HeaderRoundTripper{
Next: http.DefaultTransport,
},
Timeout: 30 * time.Second,
},
}

for _, o := range opts {
Expand Down Expand Up @@ -248,7 +258,7 @@ func (c *command) initLogger() error {
verbosity := c.globalConfig.GetString(optionNameLogVerbosity)
lokiEndpoint := c.globalConfig.GetString(optionNameLokiEndpoint)

log, err := newLogger(c.root, verbosity, lokiEndpoint)
log, err := newLogger(c.root, verbosity, lokiEndpoint, c.httpClient)
if err != nil {
return fmt.Errorf("new logger: %w", err)
}
Expand All @@ -263,7 +273,7 @@ func (c *command) loadConfigDirectory() error {
// read configuration from git repo
fs := memfs.New()
if _, err := git.Clone(memory.NewStorage(), fs, &git.CloneOptions{
Auth: &http.BasicAuth{
Auth: &httptransport.BasicAuth{
Username: c.globalConfig.GetString(optionNameConfigGitUsername),
Password: c.globalConfig.GetString(optionNameConfigGitPassword),
},
Expand Down Expand Up @@ -429,6 +439,7 @@ func (c *command) setSwapClient() (err error) {
c.swapClient = swap.NewGethClient(gethUrl, &swap.GethClientOptions{
BzzTokenAddress: c.globalConfig.GetString("bzz-token-address"),
EthAccount: c.globalConfig.GetString("eth-account"),
HTTPClient: c.httpClient,
}, c.log)
} else {
c.swapClient = &swap.NotSet{}
Expand All @@ -437,10 +448,10 @@ func (c *command) setSwapClient() (err error) {
return
}

func newLogger(cmd *cobra.Command, verbosity, lokiEndpoint string) (logging.Logger, error) {
func newLogger(cmd *cobra.Command, verbosity, lokiEndpoint string, httpClient *http.Client) (logging.Logger, error) {
var logger logging.Logger
opts := []logging.LoggerOption{
logging.WithLokiOption(lokiEndpoint),
logging.WithLokiOption(lokiEndpoint, httpClient),
logging.WithMetricsOption(),
}

Expand Down
2 changes: 2 additions & 0 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ checks:
options:
cache-size: 10
reserve-size: 16
postage-label: gc-check
timeout: 5m
type: gc
kademlia:
Expand Down Expand Up @@ -236,6 +237,7 @@ checks:
type: postage
timeout: 5m
options:
postage-label: postage-check
postage-amount: 1000
postage-depth: 17
postage-topup-amount: 100
Expand Down
2 changes: 2 additions & 0 deletions config/local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ checks:
options:
cache-size: 10
reserve-size: 16
postage-label: gc-check
timeout: 5m
type: gc
ci-manifest:
Expand Down Expand Up @@ -311,6 +312,7 @@ checks:
type: postage
timeout: 5m
options:
postage-label: postage-check
postage-amount: 1000
postage-depth: 17
postage-topup-amount: 100
Expand Down
1 change: 1 addition & 0 deletions config/public-testnet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ checks:
type: postage
timeout: 30m
options:
postage-label: postage-check
postage-amount: 140000000
postage-depth: 17
postage-topup-amount: 100
Expand Down
118 changes: 38 additions & 80 deletions pkg/bee/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ import (
"net/url"
"strconv"
"strings"

"github.com/ethersphere/bee/v2/pkg/swarm"
"github.com/ethersphere/beekeeper"
)

const (
Expand All @@ -38,11 +35,10 @@ const (
swarmErrorDocumentHeader = "Swarm-Error-Document"
)

var userAgent = "beekeeper/" + beekeeper.Version

// Client manages communication with the Bee API.
type Client struct {
httpClient *http.Client // HTTP client must handle authentication implicitly.
apiURL *url.URL // Base URL for API requests.
service service // Reuse a single struct instead of allocating one for each service on the heap.

// Services that API provides.
Expand Down Expand Up @@ -70,22 +66,24 @@ type ClientOptions struct {
}

// NewClient constructs a new Client.
func NewClient(baseURL *url.URL, o *ClientOptions) (c *Client) {
if o == nil {
o = new(ClientOptions)
func NewClient(apiURL *url.URL, httpClient *http.Client) (*Client, error) {
if httpClient == nil {
httpClient = &http.Client{}
}
if o.HTTPClient == nil {
o.HTTPClient = new(http.Client)
if apiURL == nil {
return nil, errors.New("baseURL is required")
}

c = newClient(httpClientWithTransport(baseURL, o.HTTPClient))
return
return newClient(apiURL, httpClient), nil
}

// newClient constructs a new *Client with the provided http Client, which
// should handle authentication implicitly, and sets all API services.
func newClient(httpClient *http.Client) (c *Client) {
c = &Client{httpClient: httpClient}
func newClient(apiURL *url.URL, httpClient *http.Client) (c *Client) {
c = &Client{
httpClient: httpClient,
apiURL: apiURL,
}
c.service.client = c

c.Act = (*ActService)(&c.service)
Expand All @@ -108,32 +106,6 @@ func newClient(httpClient *http.Client) (c *Client) {
return c
}

func httpClientWithTransport(baseURL *url.URL, c *http.Client) *http.Client {
if c == nil {
c = new(http.Client)
}

transport := c.Transport
if transport == nil {
transport = http.DefaultTransport
}

if !strings.HasSuffix(baseURL.Path, "/") {
baseURL.Path += "/"
}

c.Transport = roundTripperFunc(func(r *http.Request) (resp *http.Response, err error) {
r.Header.Set("User-Agent", userAgent)
u, err := baseURL.Parse(r.URL.String())
if err != nil {
return nil, err
}
r.URL = u
return transport.RoundTrip(r)
})
return c
}

// requestJSON handles the HTTP request response cycle. It JSON encodes the request
// body, creates an HTTP request with provided method on a path with required
// headers and decodes request body if the v argument is not nil and content type is
Expand All @@ -150,9 +122,22 @@ func (c *Client) requestJSON(ctx context.Context, method, path string, body, v i
return c.request(ctx, method, path, bodyBuffer, v)
}

func (c *Client) getFullURL(path string) (string, error) {
rel, err := url.Parse(path)
if err != nil {
return "", fmt.Errorf("failed to parse path: %w", err)
}
return c.apiURL.ResolveReference(rel).String(), nil
}

// request handles the HTTP request response cycle.
func (c *Client) request(ctx context.Context, method, path string, body io.Reader, v interface{}) (err error) {
req, err := http.NewRequest(method, path, body)
fullURL, err := c.getFullURL(path)
if err != nil {
return err
}

req, err := http.NewRequest(method, fullURL, body)
if err != nil {
return err
}
Expand Down Expand Up @@ -196,7 +181,12 @@ func (c *Client) requestData(ctx context.Context, method, path string, body io.R

// requestDataGetHeader handles the HTTP request response cycle and returns the response body and header.
func (c *Client) requestDataGetHeader(ctx context.Context, method, path string, body io.Reader, opts *DownloadOptions) (resp io.ReadCloser, h http.Header, err error) {
req, err := http.NewRequest(method, path, body)
fullURL, err := c.getFullURL(path)
if err != nil {
return nil, nil, err
}

req, err := http.NewRequest(method, fullURL, body)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -245,7 +235,12 @@ func (c *Client) requestDataGetHeader(ctx context.Context, method, path string,

// requestWithHeader handles the HTTP request response cycle.
func (c *Client) requestWithHeader(ctx context.Context, method, path string, header http.Header, body io.Reader, v interface{}, headerParser ...func(http.Header)) (err error) {
req, err := http.NewRequest(method, path, body)
fullURL, err := c.getFullURL(path)
if err != nil {
return err
}

req, err := http.NewRequest(method, fullURL, body)
if err != nil {
return err
}
Expand Down Expand Up @@ -324,40 +319,3 @@ func responseErrorHandler(r *http.Response) (err error) {
type service struct {
client *Client
}

// Bool is a helper routine that allocates a new bool value to store v and
// returns a pointer to it.
func Bool(v bool) (p *bool) { return &v }

// roundTripperFunc type is an adapter to allow the use of ordinary functions as
// http.RoundTripper interfaces. If f is a function with the appropriate
// signature, roundTripperFunc(f) is a http.RoundTripper that calls f.
type roundTripperFunc func(*http.Request) (*http.Response, error)

// RoundTrip calls f(r).
func (f roundTripperFunc) RoundTrip(r *http.Request) (*http.Response, error) {
return f(r)
}

type UploadOptions struct {
Act bool
Pin bool
Tag uint64
BatchID string
Direct bool
ActHistoryAddress swarm.Address

// Dirs
IndexDocument string
ErrorDocument string
}

type DownloadOptions struct {
Act *bool
ActHistoryAddress *swarm.Address
ActPublicKey *swarm.Address
ActTimestamp *uint64
Cache *bool
RedundancyFallbackMode *bool
OnlyRootChunk *bool
}
26 changes: 26 additions & 0 deletions pkg/bee/api/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package api

import "github.com/ethersphere/bee/v2/pkg/swarm"

type UploadOptions struct {
Act bool
Pin bool
Tag uint64
BatchID string
Direct bool
ActHistoryAddress swarm.Address

// Dirs
IndexDocument string
ErrorDocument string
}

type DownloadOptions struct {
Act *bool
ActHistoryAddress *swarm.Address
ActPublicKey *swarm.Address
ActTimestamp *uint64
Cache *bool
RedundancyFallbackMode *bool
OnlyRootChunk *bool
}
Loading