Skip to content

Commit

Permalink
aeon: add connection from the etcd/tcs config
Browse files Browse the repository at this point in the history
@@@@Tarantool_box document
Title: add connection from the etcd/tcs config

Added ability to connect to gRpc server used a configuration etcd/tcs

tt aeon connect URI INSTANCE_NAME

Closes #1052
  • Loading branch information
AlexandrLitkevich committed Feb 21, 2025
1 parent 648f98a commit 46ee573
Show file tree
Hide file tree
Showing 19 changed files with 1,022 additions and 51 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

- `tt pack `: added TCM file packaging.
- `tt aeon connect`: add connection from the cluster config.
- `tt aeon connect`: add connection from the `app:insance_name`.
- `tt aeon connect`: add connection from the etcd/tcs config.

### Changed

Expand Down
1 change: 0 additions & 1 deletion cli/aeon/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ func getTlsConfig(args cmd.Ssl) (*tls.Config, error) {
}
}
// Else if RootCAs is nil, TLS uses the host's root CA set.

cert, err := getCertificate(args)
if err != nil {
return nil, fmt.Errorf("failed get certificate: %w", err)
Expand Down
131 changes: 131 additions & 0 deletions cli/aeon/cmd/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package cmd

import (
"context"
"fmt"
"os"

"github.com/tarantool/go-tarantool/v2"
libcluster "github.com/tarantool/tt/lib/cluster"
"github.com/tarantool/tt/lib/connect"
clientv3 "go.etcd.io/etcd/client/v3"
)

// connectEtcd establishes a connection to etcd.
func connectEtcd(uriOpts UriOpts, connOpts connectOpts) (*clientv3.Client, error) {
etcdOpts := MakeEtcdOptsFromUriOpts(uriOpts)
if etcdOpts.Username == "" && etcdOpts.Password == "" {
etcdOpts.Username = connOpts.Username
etcdOpts.Password = connOpts.Password
if etcdOpts.Username == "" {
etcdOpts.Username = os.Getenv(connect.EtcdUsernameEnv)
}
if etcdOpts.Password == "" {
etcdOpts.Password = os.Getenv(connect.EtcdPasswordEnv)
}
}

etcdcli, err := libcluster.ConnectEtcd(etcdOpts)
if err != nil {
return nil, fmt.Errorf("failed to connect to etcd: %w", err)
}
return etcdcli, nil
}

// connectTarantool establishes a connection to Tarantool.
func connectTarantool(uriOpts UriOpts, connOpts connectOpts) (tarantool.Connector, error) {
if uriOpts.Username == "" && uriOpts.Password == "" {
uriOpts.Username = connOpts.Username
uriOpts.Password = connOpts.Password
if uriOpts.Username == "" {
uriOpts.Username = os.Getenv(connect.TarantoolUsernameEnv)
}
if uriOpts.Password == "" {
uriOpts.Password = os.Getenv(connect.TarantoolPasswordEnv)
}
}

dialer, connectorOpts := MakeConnectOptsFromUriOpts(uriOpts)

ctx := context.Background()
if connectorOpts.Timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, connectorOpts.Timeout)
defer cancel()
}
conn, err := tarantool.Connect(ctx, dialer, connectorOpts)
if err != nil {
return nil, fmt.Errorf("failed to connect to tarantool: %w", err)
}
return conn, nil
}

// doOnStorage determines a storage based on the opts.
func doOnStorage(connOpts connectOpts, opts UriOpts,
tarantoolFunc func(tarantool.Connector) error, etcdFunc func(*clientv3.Client) error) error {
etcdcli, errEtcd := connectEtcd(opts, connOpts)
if errEtcd == nil {
return etcdFunc(etcdcli)
}

conn, errTarantool := connectTarantool(opts, connOpts)
if errTarantool == nil {
return tarantoolFunc(conn)
}

return fmt.Errorf("failed to establish a connection to tarantool or etcd: %w, %w",
errTarantool, errEtcd)
}

// createPublisherAndCollector creates a new data publisher and collector based on UriOpts.
func createPublisherAndCollector(
publishers libcluster.DataPublisherFactory,
collectors libcluster.CollectorFactory,
connOpts connectOpts,
opts UriOpts) (libcluster.DataPublisher, libcluster.Collector, func(), error) {
prefix, key, timeout := opts.Prefix, opts.Key, opts.Timeout

var (
publisher libcluster.DataPublisher
collector libcluster.Collector
err error
closeFunc func()
)

tarantoolFunc := func(conn tarantool.Connector) error {
if collectors != nil {
collector, err = collectors.NewTarantool(conn, prefix, key, timeout)
if err != nil {
conn.Close()
return fmt.Errorf("failed to create tarantool config storage collector: %w", err)
}
}
closeFunc = func() { conn.Close() }
return nil
}

etcdFunc := func(client *clientv3.Client) error {
if publishers != nil {
publisher, err = publishers.NewEtcd(client, prefix, key, timeout)
if err != nil {
client.Close()
return fmt.Errorf("failed to create etcd publisher: %w", err)
}
}
if collectors != nil {
collector, err = collectors.NewEtcd(client, prefix, key, timeout)
if err != nil {
client.Close()
return fmt.Errorf("failed to create etcd collector: %w", err)
}
}
closeFunc = func() { client.Close() }
return nil
}

if err := doOnStorage(connOpts, opts, tarantoolFunc, etcdFunc); err != nil {
return nil, nil, nil, err
}

return publisher, collector, closeFunc, nil
}
10 changes: 10 additions & 0 deletions cli/aeon/cmd/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ type Ssl struct {

// ConnectCtx keeps context information for aeon connection.
type ConnectCtx struct {
// Username defines a username for connection.
Username string
// Password defines a password for connection.
Password string
// Ssl group of paths to ssl key files.
Ssl Ssl
// Transport is a connection mode.
Expand Down Expand Up @@ -41,3 +45,9 @@ type AdvertiseParams struct {
// CaFile path to the trusted certificate authorities (CA) file (optional).
CaFile string `mapstructure:"ssl_ca_file"`
}

// connectOpts is additional connect options specified by a user.
type connectOpts struct {
Username string
Password string
}
84 changes: 84 additions & 0 deletions cli/aeon/cmd/connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package cmd

import (
"errors"
"fmt"
"net/url"

"github.com/mitchellh/mapstructure"
"github.com/tarantool/tt/cli/util"
libcluster "github.com/tarantool/tt/lib/cluster"
libconnect "github.com/tarantool/tt/lib/connect"
)

func ShowUri(connectCtx *ConnectCtx, uri *url.URL,
instanceName string, collectors libcluster.CollectorFactory) error {
uriOpts, err := ParseUriOpts(uri)
if err != nil {
return fmt.Errorf("invalid URL %q: %w", uri, err)
}

connOpts := connectOpts{
Username: connectCtx.Username,
Password: connectCtx.Password,
}
_, collector, cancel, err := createPublisherAndCollector(
nil,
collectors,
connOpts, uriOpts)
if err != nil {
return err
}
defer cancel()

config, err := collector.Collect()
if err != nil {
return fmt.Errorf("failed to collect a configuration: %w", err)
}

clusterConfig, err := libcluster.MakeClusterConfig(config)
if err != nil {
return err
}

result := libcluster.Instantiate(clusterConfig, instanceName)

dataSsl := []string{"roles_cfg", "aeon.grpc", "advertise"}
data, err := result.Get(dataSsl)
if err != nil {
return err
}

var advertise Advertise
err = mapstructure.Decode(data, &advertise)
if err != nil {
return err
}

if advertise.Uri == "" {
return errors.New("invalid connection url")
}

cleanedURL, err := util.RemoveScheme(advertise.Uri)
if err != nil {
return err
}

connectCtx.Network, connectCtx.Address = libconnect.ParseBaseURI(cleanedURL)

if (advertise.Params.Transport != "ssl") && (advertise.Params.Transport != "plain") {
return errors.New("transport must be ssl or plain")
}

if advertise.Params.Transport == "ssl" {
connectCtx.Transport = TransportSsl

connectCtx.Ssl = Ssl{
KeyFile: advertise.Params.KeyFile,
CertFile: advertise.Params.CertFile,
CaFile: advertise.Params.CaFile,
}
}

return nil
}
Loading

0 comments on commit 46ee573

Please sign in to comment.