diff --git a/.chloggen/add-pubsub-ordering.yaml b/.chloggen/add-pubsub-ordering.yaml new file mode 100644 index 000000000000..35e366472082 --- /dev/null +++ b/.chloggen/add-pubsub-ordering.yaml @@ -0,0 +1,4 @@ +change_type: enhancement +component: googlecloudpubsubexporter +note: Add support for exporting ordered messages to GCP Pub/Sub +issues: [32850] diff --git a/exporter/googlecloudpubsubexporter/README.md b/exporter/googlecloudpubsubexporter/README.md index 3a494927e063..13c4e3dfe23f 100644 --- a/exporter/googlecloudpubsubexporter/README.md +++ b/exporter/googlecloudpubsubexporter/README.md @@ -19,7 +19,7 @@ This exporter sends OTLP messages to a Google Cloud [Pubsub](https://cloud.googl The following configuration options are supported: * `project` (Optional): The Google Cloud Project of the topics. -* `topic` (Required): The topic name to receive OTLP data over. The topic name should be a fully qualified resource +* `topic` (Required): The topic name to send OTLP data over. The topic name should be a fully qualified resource name (eg: `projects/otel-project/topics/otlp`). * `compression` (Optional): Set the payload compression, only `gzip` is supported. Default is no compression. * `watermark` Behaviour of how the `ce-time` attribute is set (see watermark section for more info) @@ -31,17 +31,25 @@ The following configuration options are supported: or switching between [global and regional service endpoints](https://cloud.google.com/pubsub/docs/reference/service_apis_overview#service_endpoints). * `insecure` (Optional): allows performing “insecure” SSL connections and transfers, useful when connecting to a local emulator instance. Only has effect if Endpoint is not "" +* `ordering`: Configures the [PubSub ordering](https://cloud.google.com/pubsub/docs/ordering) feature, see + [ordering](#ordering) section for more info. + * `enabled` (default = `false`): Enables the ordering. Default is disabled. + * `from_resource_attribute` (no default): resource attribute that will be used as the ordering key. Required when + `ordering.enabled` is `true`. If the resource attribute is missing or has an empty value, the messages will not be + ordered for this resource. + * `remove_resource_attribute` (default = `false`): if the ordering key resource attribute specified + `from_resource_attribute` should be removed from the resource attributes. ```yaml exporters: googlecloudpubsub: project: my-project - topic: otlp-traces + topic: projects/my-project/topics/otlp-traces ``` ## Pubsub topic -The Google Cloud [Pubsub](https://cloud.google.com/pubsub) export doesn't automatic create topics, it expects the topic +The Google Cloud [Pubsub](https://cloud.google.com/pubsub) exporter doesn't automatically create topics, it expects the topic to be created upfront. Security wise it's best to give the collector its own service account and give the topic `Pub/Sub Publisher` permission. @@ -74,11 +82,11 @@ up to 20% of the cost. This can be done by setting the `compression` to `gzip`. exporters: googlecloudpubsub: project: my-project - topic: otlp-traces + topic: projects/my-project/topics/otlp-traces compression: gzip ``` -The exporter with add the `content-encoding` attribute to the message. The receiver will look at this attribute +The exporter will add the `content-encoding` attribute to the message. The receiver will look at this attribute to detect the compression that is used on the payload. Only `gzip` is supported. @@ -100,7 +108,7 @@ timestamp , if you want to behaviour to have effect as the default is `0s`. exporters: googlecloudpubsub: project: my-project - topic: otlp-traces + topic: projects/my-project/topics/otlp-traces watermark: behavior: earliest allow_drift: 1h @@ -119,3 +127,35 @@ scenario is `behavior: earliest` with a reasonable `allow_drift` of `1h`. Allowed behavior values are `current` or `earliest`. For `allow_drift` the default is `0s`, so make sure to set the value. + +## Ordering + +When ordering is enabled (`ordering.enabled`), you are required to specify a resource attribute key that will be used as +the ordering key (`ordering.from_resource_attribute`). If this resource attribute is only meant to be used as an +ordering key, you may want to choose to get this resource attribute key (`ordering.from_resource_attribute`) removed +before publishing to PubSub by enabling the `ordering.remove_resource_attribute` configuration. + +```yaml +exporters: + googlecloudpubsub: + project: my-project + topic: projects/my-project/topics/otlp-traces + ordering: + enabled: true + from_resource_attribute: some.resource.attribute.key + remove_resource_attribute: true +``` + +### Notes + +While the PubSub topic doesn't require any configuration for ordering, you will need to enable ordering on your +subscription(s) if you need it. Enabling ordering on a subscription is only possible at creation. +For composite ordering keys you'd need to compose the resource attribute value before exporting e.g., by using a +[transform processor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor) +. + +Empty values in the ordering key are accepted but won't be ordered, see [PubSub ordering documentation](https://cloud.google.com/pubsub/docs/ordering) +for more details. + +PubSub requires one publish request per ordering key value, so this exporter groups the signals per ordering key before +publishing. diff --git a/exporter/googlecloudpubsubexporter/config.go b/exporter/googlecloudpubsubexporter/config.go index 829e659a6238..b3c7e2c76a57 100644 --- a/exporter/googlecloudpubsubexporter/config.go +++ b/exporter/googlecloudpubsubexporter/config.go @@ -10,6 +10,7 @@ import ( "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/exporter/exporterhelper" + "go.uber.org/multierr" ) var topicMatcher = regexp.MustCompile(`^projects/[a-z][a-z0-9\-]*/topics/`) @@ -34,6 +35,8 @@ type Config struct { Compression string `mapstructure:"compression"` // Watermark defines the watermark (the ce-time attribute on the message) behavior Watermark WatermarkConfig `mapstructure:"watermark"` + // Ordering configures the ordering keys + Ordering OrderingConfig `mapstructure:"ordering"` } // WatermarkConfig customizes the behavior of the watermark @@ -46,15 +49,27 @@ type WatermarkConfig struct { AllowedDrift time.Duration `mapstructure:"allowed_drift"` } +// OrderingConfig customizes the behavior of the ordering +type OrderingConfig struct { + // Enabled indicates if ordering is enabled + Enabled bool `mapstructure:"enabled"` + // FromResourceAttribute is a resource attribute that will be used as the ordering key. + FromResourceAttribute string `mapstructure:"from_resource_attribute"` + // RemoveResourceAttribute indicates if the ordering key should be removed from the resource attributes. + RemoveResourceAttribute bool `mapstructure:"remove_resource_attribute"` +} + func (config *Config) Validate() error { + var errors error if !topicMatcher.MatchString(config.Topic) { - return fmt.Errorf("topic '%s' is not a valid format, use 'projects//topics/'", config.Topic) + errors = multierr.Append(errors, fmt.Errorf("topic '%s' is not a valid format, use 'projects//topics/'", config.Topic)) } - _, err := config.parseCompression() - if err != nil { - return err + if _, err := config.parseCompression(); err != nil { + errors = multierr.Append(errors, err) } - return config.Watermark.validate() + errors = multierr.Append(errors, config.Watermark.validate()) + errors = multierr.Append(errors, config.Ordering.validate()) + return errors } func (config *WatermarkConfig) validate() error { @@ -65,6 +80,13 @@ func (config *WatermarkConfig) validate() error { return err } +func (cfg *OrderingConfig) validate() error { + if cfg.Enabled && cfg.FromResourceAttribute == "" { + return fmt.Errorf("'from_resource_attribute' is required if ordering is enabled") + } + return nil +} + func (config *Config) parseCompression() (compression, error) { switch config.Compression { case "gzip": diff --git a/exporter/googlecloudpubsubexporter/config_test.go b/exporter/googlecloudpubsubexporter/config_test.go index 0a7943d000db..dbd7cad265f7 100644 --- a/exporter/googlecloudpubsubexporter/config_test.go +++ b/exporter/googlecloudpubsubexporter/config_test.go @@ -34,18 +34,21 @@ func TestLoadConfig(t *testing.T) { require.NoError(t, err) require.NoError(t, sub.Unmarshal(cfg)) - customConfig := factory.CreateDefaultConfig().(*Config) + expectedConfig := factory.CreateDefaultConfig().(*Config) - customConfig.ProjectID = "my-project" - customConfig.UserAgent = "opentelemetry-collector-contrib {{version}}" - customConfig.TimeoutSettings = exporterhelper.TimeoutConfig{ + expectedConfig.ProjectID = "my-project" + expectedConfig.UserAgent = "opentelemetry-collector-contrib {{version}}" + expectedConfig.TimeoutSettings = exporterhelper.TimeoutConfig{ Timeout: 20 * time.Second, } - customConfig.Topic = "projects/my-project/topics/otlp-topic" - customConfig.Compression = "gzip" - customConfig.Watermark.Behavior = "earliest" - customConfig.Watermark.AllowedDrift = time.Hour - assert.Equal(t, cfg, customConfig) + expectedConfig.Topic = "projects/my-project/topics/otlp-topic" + expectedConfig.Compression = "gzip" + expectedConfig.Watermark.Behavior = "earliest" + expectedConfig.Watermark.AllowedDrift = time.Hour + expectedConfig.Ordering.Enabled = true + expectedConfig.Ordering.FromResourceAttribute = "ordering_key" + expectedConfig.Ordering.RemoveResourceAttribute = true + assert.Equal(t, expectedConfig, cfg) } func TestTopicConfigValidation(t *testing.T) { @@ -100,3 +103,14 @@ func TestWatermarkDefaultMaxDriftValidation(t *testing.T) { assert.NoError(t, c.Validate()) assert.Equal(t, time.Duration(9223372036854775807), c.Watermark.AllowedDrift) } + +func TestOrderConfigValidation(t *testing.T) { + factory := NewFactory() + c := factory.CreateDefaultConfig().(*Config) + c.Topic = "projects/project/topics/my-topic" + assert.NoError(t, c.Validate()) + c.Ordering.Enabled = true + assert.Error(t, c.Validate()) + c.Ordering.FromResourceAttribute = "key" + assert.NoError(t, c.Validate()) +} diff --git a/exporter/googlecloudpubsubexporter/exporter.go b/exporter/googlecloudpubsubexporter/exporter.go index f2b94d7cf4f3..f9cbe363cc16 100644 --- a/exporter/googlecloudpubsubexporter/exporter.go +++ b/exporter/googlecloudpubsubexporter/exporter.go @@ -85,18 +85,14 @@ func (ex *pubsubExporter) shutdown(_ context.Context) error { return client.Close() } -func (ex *pubsubExporter) publishMessage(ctx context.Context, encoding encoding, data []byte, watermark time.Time) error { - if len(data) == 0 { - return nil - } - +func (ex *pubsubExporter) getMessageAttributes(encoding encoding, watermark time.Time) (map[string]string, error) { id, err := ex.makeUUID() if err != nil { - return err + return nil, err } ceTime, err := watermark.MarshalText() if err != nil { - return err + return nil, err } attributes := map[string]string{ @@ -118,21 +114,199 @@ func (ex *pubsubExporter) publishMessage(ctx context.Context, encoding encoding, } if ex.ceCompression == gZip { attributes["content-encoding"] = "gzip" - data, err = ex.compress(data) - if err != nil { + } + return attributes, err +} + +func (ex *pubsubExporter) consumeTraces(ctx context.Context, traces ptrace.Traces) error { + if !ex.config.Ordering.Enabled { + return ex.publishTraces(ctx, traces, "") + } + + tracesByOrderingKey := map[string]ptrace.Traces{ + "": ptrace.NewTraces(), + } + traces.ResourceSpans().RemoveIf(func(resourceSpans ptrace.ResourceSpans) bool { + orderingKey, found := resourceSpans.Resource().Attributes().Get(ex.config.Ordering.FromResourceAttribute) + if !found { + return false + } + + orderingKeyValue := orderingKey.AsString() + if _, exists := tracesByOrderingKey[orderingKeyValue]; !exists { + tracesByOrderingKey[orderingKeyValue] = ptrace.NewTraces() + } + + if ex.config.Ordering.RemoveResourceAttribute { + _ = resourceSpans.Resource().Attributes().Remove(ex.config.Ordering.FromResourceAttribute) + } + + resourceSpans.MoveTo(tracesByOrderingKey[orderingKeyValue].ResourceSpans().AppendEmpty()) + return true + }) + + // No ordering key + if traces.SpanCount() > 0 { + traces.ResourceSpans().MoveAndAppendTo(tracesByOrderingKey[""].ResourceSpans()) + } + + for key, tracesForKey := range tracesByOrderingKey { + if err := ex.publishTraces(ctx, tracesForKey, key); err != nil { + return err + } + } + return nil +} + +func (ex *pubsubExporter) publishTraces(ctx context.Context, tracesForKey ptrace.Traces, orderingKey string) error { + watermark := ex.tracesWatermarkFunc(tracesForKey, time.Now(), ex.config.Watermark.AllowedDrift).UTC() + attributes, attributesErr := ex.getMessageAttributes(otlpProtoTrace, watermark) + if attributesErr != nil { + return fmt.Errorf("error while preparing pubsub message attributes: %w", attributesErr) + } + + data, err := ex.tracesMarshaler.MarshalTraces(tracesForKey) + if err != nil { + return fmt.Errorf("error while marshaling traces: %w", err) + } + + return ex.publishMessage(ctx, data, attributes, orderingKey) +} + +func (ex *pubsubExporter) consumeMetrics(ctx context.Context, metrics pmetric.Metrics) error { + if !ex.config.Ordering.Enabled { + return ex.publishMetrics(ctx, metrics, "") + } + + metricsByOrderingKey := map[string]pmetric.Metrics{ + "": pmetric.NewMetrics(), + } + metrics.ResourceMetrics().RemoveIf(func(resourceMetrics pmetric.ResourceMetrics) bool { + orderingKey, found := resourceMetrics.Resource().Attributes().Get(ex.config.Ordering.FromResourceAttribute) + if !found { + return false + } + + orderingKeyValue := orderingKey.AsString() + if _, exists := metricsByOrderingKey[orderingKeyValue]; !exists { + metricsByOrderingKey[orderingKeyValue] = pmetric.NewMetrics() + } + + if ex.config.Ordering.RemoveResourceAttribute { + _ = resourceMetrics.Resource().Attributes().Remove(ex.config.Ordering.FromResourceAttribute) + } + + resourceMetrics.MoveTo(metricsByOrderingKey[orderingKeyValue].ResourceMetrics().AppendEmpty()) + return true + }) + + // No ordering key + if metrics.DataPointCount() > 0 { + metrics.ResourceMetrics().MoveAndAppendTo(metricsByOrderingKey[""].ResourceMetrics()) + } + + for key, metricsForKey := range metricsByOrderingKey { + if err := ex.publishMetrics(ctx, metricsForKey, key); err != nil { + return err + } + } + return nil +} + +func (ex *pubsubExporter) publishMetrics(ctx context.Context, metricsForKey pmetric.Metrics, orderingKey string) error { + watermark := ex.metricsWatermarkFunc(metricsForKey, time.Now(), ex.config.Watermark.AllowedDrift).UTC() + attributes, attributesErr := ex.getMessageAttributes(otlpProtoMetric, watermark) + if attributesErr != nil { + return fmt.Errorf("error while preparing pubsub message attributes: %w", attributesErr) + } + + data, err := ex.metricsMarshaler.MarshalMetrics(metricsForKey) + if err != nil { + return fmt.Errorf("error while marshaling metrics: %w", err) + } + + return ex.publishMessage(ctx, data, attributes, orderingKey) +} + +func (ex *pubsubExporter) consumeLogs(ctx context.Context, logs plog.Logs) error { + if !ex.config.Ordering.Enabled { + return ex.publishLogs(ctx, logs, "") + } + + logsByOrderingKey := map[string]plog.Logs{ + "": plog.NewLogs(), + } + if ex.config.Ordering.Enabled { + logs.ResourceLogs().RemoveIf(func(resourceLogs plog.ResourceLogs) bool { + orderingKey, found := resourceLogs.Resource().Attributes().Get(ex.config.Ordering.FromResourceAttribute) + if !found { + return false + } + + orderingKeyValue := orderingKey.AsString() + if _, exists := logsByOrderingKey[orderingKeyValue]; !exists { + logsByOrderingKey[orderingKeyValue] = plog.NewLogs() + } + + if ex.config.Ordering.RemoveResourceAttribute { + _ = resourceLogs.Resource().Attributes().Remove(ex.config.Ordering.FromResourceAttribute) + } + + resourceLogs.MoveTo(logsByOrderingKey[orderingKeyValue].ResourceLogs().AppendEmpty()) + return true + }) + } + + // No ordering key + if logs.LogRecordCount() > 0 { + logs.ResourceLogs().MoveAndAppendTo(logsByOrderingKey[""].ResourceLogs()) + } + + for key, logsForKey := range logsByOrderingKey { + if err := ex.publishLogs(ctx, logsForKey, key); err != nil { return err } } - _, err = ex.client.Publish(ctx, &pubsubpb.PublishRequest{ + return nil +} + +func (ex *pubsubExporter) publishLogs(ctx context.Context, logs plog.Logs, orderingKey string) error { + watermark := ex.logsWatermarkFunc(logs, time.Now(), ex.config.Watermark.AllowedDrift).UTC() + attributes, attributesErr := ex.getMessageAttributes(otlpProtoLog, watermark) + if attributesErr != nil { + return fmt.Errorf("error while preparing pubsub message attributes: %w", attributesErr) + } + + data, err := ex.logsMarshaler.MarshalLogs(logs) + if err != nil { + return fmt.Errorf("error while marshaling logs: %w", err) + } + + return ex.publishMessage(ctx, data, attributes, orderingKey) +} + +func (ex *pubsubExporter) publishMessage(ctx context.Context, data []byte, attributes map[string]string, orderingKey string) error { + if len(data) == 0 { + return nil + } + + data, compressErr := ex.compress(data) + if compressErr != nil { + return fmt.Errorf("error while compressing pubsub message payload: %w", compressErr) + } + + _, publishErr := ex.client.Publish(ctx, &pubsubpb.PublishRequest{ Topic: ex.config.Topic, - Messages: []*pubsubpb.PubsubMessage{ - { - Attributes: attributes, - Data: data, - }, - }, + Messages: []*pubsubpb.PubsubMessage{{ + Attributes: attributes, + OrderingKey: orderingKey, + Data: data, + }}, }) - return err + if publishErr != nil { + return fmt.Errorf("failed to publish pubsub message for ordering key %q: %w", orderingKey, publishErr) + } + return nil } func (ex *pubsubExporter) compress(payload []byte) ([]byte, error) { @@ -151,27 +325,3 @@ func (ex *pubsubExporter) compress(payload []byte) ([]byte, error) { } return payload, nil } - -func (ex *pubsubExporter) consumeTraces(ctx context.Context, traces ptrace.Traces) error { - buffer, err := ex.tracesMarshaler.MarshalTraces(traces) - if err != nil { - return err - } - return ex.publishMessage(ctx, otlpProtoTrace, buffer, ex.tracesWatermarkFunc(traces, time.Now(), ex.config.Watermark.AllowedDrift).UTC()) -} - -func (ex *pubsubExporter) consumeMetrics(ctx context.Context, metrics pmetric.Metrics) error { - buffer, err := ex.metricsMarshaler.MarshalMetrics(metrics) - if err != nil { - return err - } - return ex.publishMessage(ctx, otlpProtoMetric, buffer, ex.metricsWatermarkFunc(metrics, time.Now(), ex.config.Watermark.AllowedDrift).UTC()) -} - -func (ex *pubsubExporter) consumeLogs(ctx context.Context, logs plog.Logs) error { - buffer, err := ex.logsMarshaler.MarshalLogs(logs) - if err != nil { - return err - } - return ex.publishMessage(ctx, otlpProtoLog, buffer, ex.logsWatermarkFunc(logs, time.Now(), ex.config.Watermark.AllowedDrift).UTC()) -} diff --git a/exporter/googlecloudpubsubexporter/exporter_test.go b/exporter/googlecloudpubsubexporter/exporter_test.go index b5c92068f4c9..8a9144119c63 100644 --- a/exporter/googlecloudpubsubexporter/exporter_test.go +++ b/exporter/googlecloudpubsubexporter/exporter_test.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "testing" + "time" pb "cloud.google.com/go/pubsub/apiv1/pubsubpb" "github.com/google/uuid" @@ -28,6 +29,81 @@ const ( defaultTopic = "projects/my-project/topics/otlp" ) +func TestGetMessageAttributes(t *testing.T) { + date := time.Date(2021, time.January, 1, 2, 3, 4, 5, time.UTC) + + t.Run("logs", func(t *testing.T) { + exporter, _ := newTestExporter(t) + + gotAttributes, err := exporter.getMessageAttributes(otlpProtoLog, date) + require.NoError(t, err) + + expectedAttributes := map[string]string{ + "ce-id": "00000000-0000-0000-0000-000000000000", + "ce-source": "/opentelemetry/collector/googlecloudpubsub/latest", + "ce-specversion": "1.0", + "ce-time": "2021-01-01T02:03:04.000000005Z", + "ce-type": "org.opentelemetry.otlp.logs.v1", + "content-type": "application/protobuf", + } + assert.Equal(t, expectedAttributes, gotAttributes) + }) + + t.Run("metrics", func(t *testing.T) { + exporter, _ := newTestExporter(t) + + gotAttributes, err := exporter.getMessageAttributes(otlpProtoMetric, date) + require.NoError(t, err) + + expectedAttributes := map[string]string{ + "ce-id": "00000000-0000-0000-0000-000000000000", + "ce-source": "/opentelemetry/collector/googlecloudpubsub/latest", + "ce-specversion": "1.0", + "ce-time": "2021-01-01T02:03:04.000000005Z", + "ce-type": "org.opentelemetry.otlp.metrics.v1", + "content-type": "application/protobuf", + } + assert.Equal(t, expectedAttributes, gotAttributes) + }) + + t.Run("traces", func(t *testing.T) { + exporter, _ := newTestExporter(t) + + gotAttributes, err := exporter.getMessageAttributes(otlpProtoTrace, date) + require.NoError(t, err) + + expectedAttributes := map[string]string{ + "ce-id": "00000000-0000-0000-0000-000000000000", + "ce-source": "/opentelemetry/collector/googlecloudpubsub/latest", + "ce-specversion": "1.0", + "ce-time": "2021-01-01T02:03:04.000000005Z", + "ce-type": "org.opentelemetry.otlp.traces.v1", + "content-type": "application/protobuf", + } + assert.Equal(t, expectedAttributes, gotAttributes) + }) + + t.Run("logs with compression", func(t *testing.T) { + exporter, _ := newTestExporter(t, func(cfg *Config) { + cfg.Compression = "gzip" + }) + + gotAttributes, err := exporter.getMessageAttributes(otlpProtoLog, date) + require.NoError(t, err) + + expectedAttributes := map[string]string{ + "ce-id": "00000000-0000-0000-0000-000000000000", + "ce-source": "/opentelemetry/collector/googlecloudpubsub/latest", + "ce-specversion": "1.0", + "ce-time": "2021-01-01T02:03:04.000000005Z", + "ce-type": "org.opentelemetry.otlp.logs.v1", + "content-type": "application/protobuf", + "content-encoding": "gzip", + } + assert.Equal(t, expectedAttributes, gotAttributes) + }) +} + func TestExporterNoData(t *testing.T) { exporter, publisher := newTestExporter(t, func(config *Config) { config.Watermark.Behavior = "earliest" @@ -201,6 +277,163 @@ func TestExporterSimpleDataWithCompression(t *testing.T) { }) } +func TestExporterWithOrdering(t *testing.T) { + const orderingKey = "ordering.key" + withOrdering := func(cfg *Config) { + cfg.Ordering.Enabled = true + cfg.Ordering.FromResourceAttribute = orderingKey + cfg.Ordering.RemoveResourceAttribute = true + } + + ctx := context.Background() + + t.Run("logs", func(t *testing.T) { + exporter, publisher := newTestExporter(t, withOrdering) + + logs := plog.NewLogs() + { + resourceLogs := logs.ResourceLogs().AppendEmpty() + resourceLogs.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("some log message without ordering key 1") + resourceLogs.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("some log message without ordering key 2") + } + { + resourceLogs := logs.ResourceLogs().AppendEmpty() + resourceLogs.Resource().Attributes().PutStr(orderingKey, "") + resourceLogs.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("some log message without ordering key 1") + resourceLogs.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("some log message without ordering key 2") + } + { + resourceLogs := logs.ResourceLogs().AppendEmpty() + resourceLogs.Resource().Attributes().PutStr(orderingKey, "value 1") + resourceLogs.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("some log message 1 with ordering key 1") + resourceLogs.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("some log message 2 with ordering key 1") + } + { + resourceLogs := logs.ResourceLogs().AppendEmpty() + resourceLogs.Resource().Attributes().PutStr(orderingKey, "value 2") + resourceLogs.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("some log message 1 with ordering key 2") + resourceLogs.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("some log message 2 with ordering key 2") + } + + require.NoError(t, exporter.consumeLogs(ctx, logs)) + require.Len(t, publisher.requests, 3, "one publish call per ordering key should've been made") + + var orderingKeyValues []string + for _, request := range publisher.requests { + assert.Equal(t, defaultTopic, request.Topic) + assert.Len(t, request.Messages, 1) + + for _, msg := range request.Messages { + orderingKeyValues = append(orderingKeyValues, msg.OrderingKey) + + assert.NotEmpty(t, msg.Data) + assert.NotEmpty(t, msg.Attributes) + } + } + assert.ElementsMatch(t, orderingKeyValues, []string{"", "value 1", "value 2"}) + }) + + t.Run("metrics", func(t *testing.T) { + exporter, publisher := newTestExporter(t, withOrdering) + + metrics := pmetric.NewMetrics() + { + resourceMetrics := metrics.ResourceMetrics().AppendEmpty() + metric := resourceMetrics.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() + metric.SetName("some.metric") + metric.SetEmptyGauge().DataPoints().AppendEmpty().SetIntValue(42) + metric.Gauge().DataPoints().AppendEmpty().SetIntValue(24) + } + { + resourceMetrics := metrics.ResourceMetrics().AppendEmpty() + resourceMetrics.Resource().Attributes().PutStr(orderingKey, "") + metric := resourceMetrics.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() + metric.SetName("some.metric") + metric.SetEmptyGauge().DataPoints().AppendEmpty().SetIntValue(42) + metric.Gauge().DataPoints().AppendEmpty().SetIntValue(24) + } + { + resourceMetrics := metrics.ResourceMetrics().AppendEmpty() + resourceMetrics.Resource().Attributes().PutStr(orderingKey, "value 1") + metric := resourceMetrics.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() + metric.SetName("some.metric") + metric.SetEmptyGauge().DataPoints().AppendEmpty().SetIntValue(42) + metric.Gauge().DataPoints().AppendEmpty().SetIntValue(24) + } + { + resourceMetrics := metrics.ResourceMetrics().AppendEmpty() + resourceMetrics.Resource().Attributes().PutStr(orderingKey, "value 2") + metric := resourceMetrics.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() + metric.SetName("some.metric") + metric.SetEmptyGauge().DataPoints().AppendEmpty().SetIntValue(42) + metric.Gauge().DataPoints().AppendEmpty().SetIntValue(24) + } + + require.NoError(t, exporter.consumeMetrics(ctx, metrics)) + require.Len(t, publisher.requests, 3, "one publish call per ordering key should've been made") + + var orderingKeyValues []string + for _, request := range publisher.requests { + assert.Equal(t, defaultTopic, request.Topic) + assert.Len(t, request.Messages, 1) + + for _, msg := range request.Messages { + orderingKeyValues = append(orderingKeyValues, msg.OrderingKey) + + assert.NotEmpty(t, msg.Data) + assert.NotEmpty(t, msg.Attributes) + } + } + assert.ElementsMatch(t, orderingKeyValues, []string{"", "value 1", "value 2"}) + }) + + t.Run("traces", func(t *testing.T) { + exporter, publisher := newTestExporter(t, withOrdering) + + traces := ptrace.NewTraces() + { + resourceSpans := traces.ResourceSpans().AppendEmpty() + span := resourceSpans.ScopeSpans().AppendEmpty().Spans().AppendEmpty() + span.SetName("some span 1") + } + { + resourceSpans := traces.ResourceSpans().AppendEmpty() + resourceSpans.Resource().Attributes().PutStr(orderingKey, "") + span := resourceSpans.ScopeSpans().AppendEmpty().Spans().AppendEmpty() + span.SetName("some span 1") + } + { + resourceSpans := traces.ResourceSpans().AppendEmpty() + resourceSpans.Resource().Attributes().PutStr(orderingKey, "value 1") + span := resourceSpans.ScopeSpans().AppendEmpty().Spans().AppendEmpty() + span.SetName("some span 2") + } + { + resourceSpans := traces.ResourceSpans().AppendEmpty() + resourceSpans.Resource().Attributes().PutStr(orderingKey, "value 2") + span := resourceSpans.ScopeSpans().AppendEmpty().Spans().AppendEmpty() + span.SetName("some span 3") + } + + require.NoError(t, exporter.consumeTraces(ctx, traces)) + require.Len(t, publisher.requests, 3, "one publish call per ordering key should've been made") + + var orderingKeyValues []string + for _, request := range publisher.requests { + assert.Equal(t, defaultTopic, request.Topic) + assert.Len(t, request.Messages, 1) + + for _, msg := range request.Messages { + orderingKeyValues = append(orderingKeyValues, msg.OrderingKey) + + assert.NotEmpty(t, msg.Data) + assert.NotEmpty(t, msg.Attributes) + } + } + assert.ElementsMatch(t, orderingKeyValues, []string{"", "value 1", "value 2"}) + }) +} + // Helpers func newTestExporter(t *testing.T, options ...func(*Config)) (*pubsubExporter, *mockPublisher) { diff --git a/exporter/googlecloudpubsubexporter/factory.go b/exporter/googlecloudpubsubexporter/factory.go index b60be1b0a8bc..16a2112e9680 100644 --- a/exporter/googlecloudpubsubexporter/factory.go +++ b/exporter/googlecloudpubsubexporter/factory.go @@ -40,11 +40,11 @@ func NewFactory() exporter.Factory { var exporters = map[*Config]*pubsubExporter{} func ensureExporter(params exporter.Settings, pCfg *Config) *pubsubExporter { - receiver := exporters[pCfg] - if receiver != nil { - return receiver + exp := exporters[pCfg] + if exp != nil { + return exp } - receiver = &pubsubExporter{ + exp = &pubsubExporter{ logger: params.Logger, userAgent: strings.ReplaceAll(pCfg.UserAgent, "{{version}}", params.BuildInfo.Version), ceSource: fmt.Sprintf("/opentelemetry/collector/%s/%s", metadata.Type.String(), params.BuildInfo.Version), @@ -56,20 +56,20 @@ func ensureExporter(params exporter.Settings, pCfg *Config) *pubsubExporter { makeClient: newPublisherClient, } // we ignore the error here as the config is already validated with the same method - receiver.ceCompression, _ = pCfg.parseCompression() + exp.ceCompression, _ = pCfg.parseCompression() watermarkBehavior, _ := pCfg.Watermark.parseWatermarkBehavior() switch watermarkBehavior { case earliest: - receiver.tracesWatermarkFunc = earliestTracesWatermark - receiver.metricsWatermarkFunc = earliestMetricsWatermark - receiver.logsWatermarkFunc = earliestLogsWatermark + exp.tracesWatermarkFunc = earliestTracesWatermark + exp.metricsWatermarkFunc = earliestMetricsWatermark + exp.logsWatermarkFunc = earliestLogsWatermark case current: - receiver.tracesWatermarkFunc = currentTracesWatermark - receiver.metricsWatermarkFunc = currentMetricsWatermark - receiver.logsWatermarkFunc = currentLogsWatermark + exp.tracesWatermarkFunc = currentTracesWatermark + exp.metricsWatermarkFunc = currentMetricsWatermark + exp.logsWatermarkFunc = currentLogsWatermark } - exporters[pCfg] = receiver - return receiver + exporters[pCfg] = exp + return exp } // createDefaultConfig creates the default configuration for exporter. @@ -81,70 +81,63 @@ func createDefaultConfig() component.Config { Behavior: "current", AllowedDrift: 0, }, + Ordering: OrderingConfig{ + Enabled: false, + FromResourceAttribute: "", + RemoveResourceAttribute: false, + }, } } -func createTracesExporter( - ctx context.Context, - set exporter.Settings, - cfg component.Config, -) (exporter.Traces, error) { +func createTracesExporter(ctx context.Context, set exporter.Settings, cfg component.Config) (exporter.Traces, error) { pCfg := cfg.(*Config) - pubsubExporter := ensureExporter(set, pCfg) + exp := ensureExporter(set, pCfg) return exporterhelper.NewTraces( ctx, set, cfg, - pubsubExporter.consumeTraces, - exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), + exp.consumeTraces, + exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}), exporterhelper.WithTimeout(pCfg.TimeoutSettings), exporterhelper.WithRetry(pCfg.BackOffConfig), exporterhelper.WithQueue(pCfg.QueueSettings), - exporterhelper.WithStart(pubsubExporter.start), - exporterhelper.WithShutdown(pubsubExporter.shutdown), + exporterhelper.WithStart(exp.start), + exporterhelper.WithShutdown(exp.shutdown), ) } -func createMetricsExporter( - ctx context.Context, - set exporter.Settings, - cfg component.Config, -) (exporter.Metrics, error) { +func createMetricsExporter(ctx context.Context, set exporter.Settings, cfg component.Config) (exporter.Metrics, error) { pCfg := cfg.(*Config) - pubsubExporter := ensureExporter(set, pCfg) + exp := ensureExporter(set, pCfg) return exporterhelper.NewMetrics( ctx, set, cfg, - pubsubExporter.consumeMetrics, - exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), + exp.consumeMetrics, + exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}), exporterhelper.WithTimeout(pCfg.TimeoutSettings), exporterhelper.WithRetry(pCfg.BackOffConfig), exporterhelper.WithQueue(pCfg.QueueSettings), - exporterhelper.WithStart(pubsubExporter.start), - exporterhelper.WithShutdown(pubsubExporter.shutdown), + exporterhelper.WithStart(exp.start), + exporterhelper.WithShutdown(exp.shutdown), ) } -func createLogsExporter( - ctx context.Context, - set exporter.Settings, - cfg component.Config, -) (exporter.Logs, error) { +func createLogsExporter(ctx context.Context, set exporter.Settings, cfg component.Config) (exporter.Logs, error) { pCfg := cfg.(*Config) - pubsubExporter := ensureExporter(set, pCfg) + exp := ensureExporter(set, pCfg) return exporterhelper.NewLogs( ctx, set, cfg, - pubsubExporter.consumeLogs, - exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), + exp.consumeLogs, + exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}), exporterhelper.WithTimeout(pCfg.TimeoutSettings), exporterhelper.WithRetry(pCfg.BackOffConfig), exporterhelper.WithQueue(pCfg.QueueSettings), - exporterhelper.WithStart(pubsubExporter.start), - exporterhelper.WithShutdown(pubsubExporter.shutdown), + exporterhelper.WithStart(exp.start), + exporterhelper.WithShutdown(exp.shutdown), ) } diff --git a/exporter/googlecloudpubsubexporter/go.mod b/exporter/googlecloudpubsubexporter/go.mod index 27611b3a04fc..281377ade2ff 100644 --- a/exporter/googlecloudpubsubexporter/go.mod +++ b/exporter/googlecloudpubsubexporter/go.mod @@ -16,6 +16,7 @@ require ( go.opentelemetry.io/collector/exporter/exportertest v0.120.1-0.20250226024140-8099e51f9a77 go.opentelemetry.io/collector/pdata v1.26.1-0.20250226024140-8099e51f9a77 go.uber.org/goleak v1.3.0 + go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 google.golang.org/api v0.223.0 google.golang.org/grpc v1.70.0 @@ -65,7 +66,6 @@ require ( go.opentelemetry.io/otel/sdk v1.34.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.34.0 // indirect go.opentelemetry.io/otel/trace v1.34.0 // indirect - go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.33.0 // indirect golang.org/x/net v0.35.0 // indirect golang.org/x/oauth2 v0.26.0 // indirect diff --git a/exporter/googlecloudpubsubexporter/testdata/config.yaml b/exporter/googlecloudpubsubexporter/testdata/config.yaml index 63e43036e294..765192434f93 100644 --- a/exporter/googlecloudpubsubexporter/testdata/config.yaml +++ b/exporter/googlecloudpubsubexporter/testdata/config.yaml @@ -8,3 +8,7 @@ googlecloudpubsub/customname: watermark: behavior: earliest allowed_drift: 1h + ordering: + enabled: true + from_resource_attribute: ordering_key + remove_resource_attribute: true