Skip to content

Commit

Permalink
[exporter/stefexporter] Add basic STEF exporter implementation
Browse files Browse the repository at this point in the history
#### Description

Added STEF exporter implementation for metrics, sending data over gRPC stream. For now only timeout, queuing and retry exporter helpers are used. We will need to decide later if other helpers are needed for this exporter.

#### Testing

Unit tests that verify connecting, reconnecting, sending, acking of data are included.

#### Documentation

Added to README.

#### Future Work

More extensive test coverage is desirable and will likely be added in the future.

We likely want to implement STEF receiver and add STEF as a tested protocol to our testbed.

A full-duplex implementation is desirable, which takes advantage of the streaming nature of STEF protocol and does not block waiting for acks.
  • Loading branch information
tigrannajaryan committed Feb 6, 2025
1 parent 2a42490 commit 62bcc5a
Show file tree
Hide file tree
Showing 9 changed files with 752 additions and 15 deletions.
27 changes: 27 additions & 0 deletions .chloggen/tigran_stefexporter.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: stefexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add basic STEF exporter implementation

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [37759]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
46 changes: 46 additions & 0 deletions exporter/stefexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,49 @@

Export data via gRPC using
[Otel/STEF format](https://github.com/splunk/stef/tree/main/go/otel) format.

## Getting Started

The following settings are required:

- `endpoint` (no default): host:port to which the exporter is going to send STEF metric data,
using the STEF/gRPC protocol. The valid syntax is described
[here](https://github.com/grpc/grpc/blob/master/doc/naming.md).
If a scheme of `https` is used then client transport security is enabled and overrides the `insecure` setting.
- `tls`: see [TLS Configuration Settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md)
for the full set of available options.

Example:

```yaml
exporters:
stef:
endpoint: otelcol2:4317
tls:
cert_file: file.cert
key_file: file.key
stef/2:
endpoint: otelcol2:4317
tls:
insecure: true
```
By default, no compression is enabled. The only supported compression method is zstd.
To enable, configure as follows:
```yaml
exporters:
otlp:
...
compression: zstd
```
## Advanced Configuration
Several helper files are leveraged to provide additional capabilities automatically:
- [gRPC settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configgrpc/README.md)
- [TLS and mTLS settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md)
- [Queuing, timeout and retry settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md).
Note that `timeout` setting controls how long the exporter waits for ACK of a data sent
over STEF/gRPC stream.
14 changes: 13 additions & 1 deletion exporter/stefexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,16 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

// Config defines configuration for logging exporter.
type Config struct {
configgrpc.ClientConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
exporterhelper.TimeoutConfig `mapstructure:",squash"`
exporterhelper.QueueConfig `mapstructure:"sending_queue"`
RetryConfig configretry.BackOffConfig `mapstructure:"retry_on_failure"`
configgrpc.ClientConfig `mapstructure:",squash"`
}

var _ component.Config = (*Config)(nil)
Expand All @@ -38,6 +43,13 @@ func (c *Config) Validate() error {
return fmt.Errorf(`invalid port "%s"`, port)
}

switch c.Compression {
case "":
case "zstd":
default:
return fmt.Errorf("unsupported compression method %q", c.Compression)
}

return nil
}

Expand Down
243 changes: 237 additions & 6 deletions exporter/stefexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,257 @@ package stefexporter // import "github.com/open-telemetry/opentelemetry-collecto

import (
"context"
"fmt"
"sync"

stefgrpc "github.com/splunk/stef/go/grpc"
"github.com/splunk/stef/go/grpc/stef_proto"
"github.com/splunk/stef/go/otel/oteltef"
stefpdatametrics "github.com/splunk/stef/go/pdata/metrics"
stefpkg "github.com/splunk/stef/go/pkg"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
"google.golang.org/grpc"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/stefexporter/internal"
)

type stefExporter struct{}
// stefExporter implements sending metrics over STEF/gRPC stream.
//
// The exporter uses a single stream and accepts concurrent exportMetrics calls,
// sequencing the metric data as needed over a single stream.
//
// The exporter will block exportMetrics call until an acknowledgement is
// received from destination.
//
// The exporter relies on a preceding Retry helper to retry sending data that is
// not acknowledged or otherwise fails to be sent. The exporter will not retry
// sending the data itself.
type stefExporter struct {
set component.TelemetrySettings
cfg *Config
compression stefpkg.Compression

// connMutex is taken when connecting, disconnecting or checking connection status.
connMutex sync.Mutex
isConnected bool
connID uint64
grpcConn *grpc.ClientConn
client *stefgrpc.Client

// The STEF writer we write metrics to and which in turns sends them over gRPC.
stefWriter *oteltef.MetricsWriter
stefWriterMutex sync.Mutex // protects stefWriter

// lastAckID is the maximum ack ID received so far.
lastAckID uint64
// Cond to protect and signal lastAckID.
ackCond *internal.CancellableCond
}

type loggerWrapper struct {
logger *zap.Logger
}

func (w *loggerWrapper) Debugf(_ context.Context, format string, v ...any) {
w.logger.Debug(fmt.Sprintf(format, v...))
}

func (w *loggerWrapper) Errorf(_ context.Context, format string, v ...any) {
w.logger.Error(fmt.Sprintf(format, v...))
}

func newStefExporter(set component.TelemetrySettings, cfg *Config) *stefExporter {
exp := &stefExporter{
set: set,
cfg: cfg,
ackCond: internal.NewCancellableCond(),
}

func newStefExporter(_ *zap.Logger, _ *Config) *stefExporter {
return &stefExporter{}
exp.compression = stefpkg.CompressionNone
if cfg.Compression == "zstd" {
exp.compression = stefpkg.CompressionZstd
}
return exp
}

func (s *stefExporter) Start(_ context.Context, _ component.Host) error {
func (s *stefExporter) Start(ctx context.Context, host component.Host) error {
// Prepare gRPC connection.
var err error
s.grpcConn, err = s.cfg.ClientConfig.ToClientConn(ctx, host, s.set)
if err != nil {
return err
}

// No need to block Start(), we will begin connection attempt in a goroutine.
go func() {
if err := s.ensureConnected(ctx); err != nil {
s.set.Logger.Error("Error connecting to destination", zap.Error(err))
// This is not a fatal error. exportMetrics() will try to connect again as needed.
}
}()
return nil
}

func (s *stefExporter) Shutdown(ctx context.Context) error {
s.disconnect(ctx)
if s.grpcConn != nil {
if err := s.grpcConn.Close(); err != nil {
s.set.Logger.Error("Failed to close grpc connection", zap.Error(err))
}
s.grpcConn = nil
}
return nil
}

func (s *stefExporter) ensureConnected(ctx context.Context) error {
s.connMutex.Lock()
defer s.connMutex.Unlock()

if s.isConnected {
return nil
}

s.set.Logger.Debug("Connecting to destination", zap.String("endpoint", s.cfg.Endpoint))

s.ackCond.Cond.L.Lock()
// Reset lastAckID. New STEF stream ack IDs will start from 1.
s.lastAckID = 0
// Increment connection ID, to make sure we don't confuse the new and
// previous (stale) connections.
s.connID++
connID := s.connID
s.ackCond.Cond.L.Unlock()

// Prepare to open a STEF/gRPC stream to the server.
grpcClient := stef_proto.NewSTEFDestinationClient(s.grpcConn)

// Let server know about our schema.
schema, err := oteltef.MetricsWireSchema()
if err != nil {
return err
}

settings := stefgrpc.ClientSettings{
Logger: &loggerWrapper{s.set.Logger},
GrpcClient: grpcClient,
ClientSchema: schema,
Callbacks: stefgrpc.ClientCallbacks{
OnAck: func(ackId uint64) error { return s.onGrpcAck(connID, ackId) },
},
}
s.client = stefgrpc.NewClient(settings)

grpcWriter, opts, err := s.client.Connect(ctx)
if err != nil {
return fmt.Errorf("failed to connect to destination: %w", err)
}

opts.Compression = s.compression

// Create STEF record writer over gRPC.
s.stefWriter, err = oteltef.NewMetricsWriter(grpcWriter, opts)
if err != nil {
return err
}

s.isConnected = true
s.set.Logger.Debug("Connected to destination", zap.String("endpoint", s.cfg.Endpoint))

return nil
}

func (s *stefExporter) Shutdown(_ context.Context) error {
func (s *stefExporter) disconnect(ctx context.Context) {
s.connMutex.Lock()
defer s.connMutex.Unlock()

if !s.isConnected {
return
}

if err := s.client.Disconnect(ctx); err != nil {
s.set.Logger.Error("Failed to disconnect", zap.Error(err))
}

s.set.Logger.Debug("Disconnected.")
s.isConnected = false
}

func (s *stefExporter) exportMetrics(ctx context.Context, md pmetric.Metrics) error {
if err := s.ensureConnected(ctx); err != nil {
return err
}

// stefWriter is not safe for concurrent writing, protect it.
s.stefWriterMutex.Lock()
defer s.stefWriterMutex.Unlock()

converter := stefpdatametrics.OtlpToTEFUnsorted{}
err := converter.WriteMetrics(md, s.stefWriter)
if err != nil {
// Error to write to STEF stream typically indicates either:
// 1) A problem with the connection. We need to reconnect.
// 2) Encoding failure, possibly due to encoder bug. In this case
// we need to reconnect too, to make sure encoders start from
// initial state, which is our best chance to succeed next time.
//
// We need to reconnect. Disconnect here and the next exportMetrics()
// call will connect again.
s.disconnect(ctx)

// TODO: check if err is because STEF encoding failed. If so we must not
// try to re-encode the same data. Return consumererror.NewPermanent(err)
// to the caller. This requires changes in STEF Go library.

// Return an error to retry sending these metrics again next time.
return err
}

// According to STEF gRPC spec the destination ack IDs match written record number.
// When the data we have just written is received by destination it will send us
// back and ack ID that numerically matches the last written record number.
expectedAckID := s.stefWriter.RecordCount()

// stefWriter normally buffers written records in memory. Flush() ensures buffered
// data is sent to network. This is necessary so that the server receives it and
// sends an acknowledgement back.
if err = s.stefWriter.Flush(); err != nil {
// Failure to write the gRPC stream normally means something is
// wrong with the connection. We need to reconnect. Disconnect here
// and the next exportMetrics() call will connect again.
s.disconnect(ctx)

// Return an error to retry sending these metrics again next time.
return err
}

// Wait for acknowledgement.
err = s.ackCond.Wait(ctx, func() bool { return s.lastAckID >= expectedAckID })
if err != nil {
return fmt.Errorf("error waiting for ack ID %d: %w", expectedAckID, err)
}

return nil
}

func (s *stefExporter) pushMetrics(_ context.Context, _ pmetric.Metrics) error {
func (s *stefExporter) onGrpcAck(connID uint64, ackID uint64) error {
s.ackCond.Cond.L.Lock()
defer s.ackCond.Cond.L.Unlock()

if s.connID != connID {
// This is an ack from a previous (stale) connection. This can happen
// due to a race if the ack from the old stream arrives after we decided
// to reconnect but the old stream is still functioning. We just need
// to ignore this ack, it is no longer relevant.
return nil
}

// The IDs are expected to always monotonically increase. Check it anyway in case
// the server misbehaves and violates the expectation.
if s.lastAckID < ackID {
s.lastAckID = ackID
s.ackCond.Cond.Broadcast()
}
return nil
}
Loading

0 comments on commit 62bcc5a

Please sign in to comment.