From 3df6e993f7201a7e11f15671a21cf17551115e8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kevin=20NO=C3=8BL?= Date: Mon, 27 Jan 2025 17:44:27 +0100 Subject: [PATCH] add pubsub exporter config for ordering Some readme fixes --- exporter/googlecloudpubsubexporter/README.md | 52 ++++++++++++++++--- exporter/googlecloudpubsubexporter/config.go | 32 ++++++++++-- .../googlecloudpubsubexporter/config_test.go | 32 ++++++++---- exporter/googlecloudpubsubexporter/factory.go | 5 ++ .../testdata/config.yaml | 4 ++ 5 files changed, 105 insertions(+), 20 deletions(-) diff --git a/exporter/googlecloudpubsubexporter/README.md b/exporter/googlecloudpubsubexporter/README.md index 3a494927e0639..13c4e3dfe23f3 100644 --- a/exporter/googlecloudpubsubexporter/README.md +++ b/exporter/googlecloudpubsubexporter/README.md @@ -19,7 +19,7 @@ This exporter sends OTLP messages to a Google Cloud [Pubsub](https://cloud.googl The following configuration options are supported: * `project` (Optional): The Google Cloud Project of the topics. -* `topic` (Required): The topic name to receive OTLP data over. The topic name should be a fully qualified resource +* `topic` (Required): The topic name to send OTLP data over. The topic name should be a fully qualified resource name (eg: `projects/otel-project/topics/otlp`). * `compression` (Optional): Set the payload compression, only `gzip` is supported. Default is no compression. * `watermark` Behaviour of how the `ce-time` attribute is set (see watermark section for more info) @@ -31,17 +31,25 @@ The following configuration options are supported: or switching between [global and regional service endpoints](https://cloud.google.com/pubsub/docs/reference/service_apis_overview#service_endpoints). * `insecure` (Optional): allows performing “insecure” SSL connections and transfers, useful when connecting to a local emulator instance. Only has effect if Endpoint is not "" +* `ordering`: Configures the [PubSub ordering](https://cloud.google.com/pubsub/docs/ordering) feature, see + [ordering](#ordering) section for more info. + * `enabled` (default = `false`): Enables the ordering. Default is disabled. + * `from_resource_attribute` (no default): resource attribute that will be used as the ordering key. Required when + `ordering.enabled` is `true`. If the resource attribute is missing or has an empty value, the messages will not be + ordered for this resource. + * `remove_resource_attribute` (default = `false`): if the ordering key resource attribute specified + `from_resource_attribute` should be removed from the resource attributes. ```yaml exporters: googlecloudpubsub: project: my-project - topic: otlp-traces + topic: projects/my-project/topics/otlp-traces ``` ## Pubsub topic -The Google Cloud [Pubsub](https://cloud.google.com/pubsub) export doesn't automatic create topics, it expects the topic +The Google Cloud [Pubsub](https://cloud.google.com/pubsub) exporter doesn't automatically create topics, it expects the topic to be created upfront. Security wise it's best to give the collector its own service account and give the topic `Pub/Sub Publisher` permission. @@ -74,11 +82,11 @@ up to 20% of the cost. This can be done by setting the `compression` to `gzip`. exporters: googlecloudpubsub: project: my-project - topic: otlp-traces + topic: projects/my-project/topics/otlp-traces compression: gzip ``` -The exporter with add the `content-encoding` attribute to the message. The receiver will look at this attribute +The exporter will add the `content-encoding` attribute to the message. The receiver will look at this attribute to detect the compression that is used on the payload. Only `gzip` is supported. @@ -100,7 +108,7 @@ timestamp , if you want to behaviour to have effect as the default is `0s`. exporters: googlecloudpubsub: project: my-project - topic: otlp-traces + topic: projects/my-project/topics/otlp-traces watermark: behavior: earliest allow_drift: 1h @@ -119,3 +127,35 @@ scenario is `behavior: earliest` with a reasonable `allow_drift` of `1h`. Allowed behavior values are `current` or `earliest`. For `allow_drift` the default is `0s`, so make sure to set the value. + +## Ordering + +When ordering is enabled (`ordering.enabled`), you are required to specify a resource attribute key that will be used as +the ordering key (`ordering.from_resource_attribute`). If this resource attribute is only meant to be used as an +ordering key, you may want to choose to get this resource attribute key (`ordering.from_resource_attribute`) removed +before publishing to PubSub by enabling the `ordering.remove_resource_attribute` configuration. + +```yaml +exporters: + googlecloudpubsub: + project: my-project + topic: projects/my-project/topics/otlp-traces + ordering: + enabled: true + from_resource_attribute: some.resource.attribute.key + remove_resource_attribute: true +``` + +### Notes + +While the PubSub topic doesn't require any configuration for ordering, you will need to enable ordering on your +subscription(s) if you need it. Enabling ordering on a subscription is only possible at creation. +For composite ordering keys you'd need to compose the resource attribute value before exporting e.g., by using a +[transform processor](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/transformprocessor) +. + +Empty values in the ordering key are accepted but won't be ordered, see [PubSub ordering documentation](https://cloud.google.com/pubsub/docs/ordering) +for more details. + +PubSub requires one publish request per ordering key value, so this exporter groups the signals per ordering key before +publishing. diff --git a/exporter/googlecloudpubsubexporter/config.go b/exporter/googlecloudpubsubexporter/config.go index 829e659a6238e..b3c7e2c76a57a 100644 --- a/exporter/googlecloudpubsubexporter/config.go +++ b/exporter/googlecloudpubsubexporter/config.go @@ -10,6 +10,7 @@ import ( "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/exporter/exporterhelper" + "go.uber.org/multierr" ) var topicMatcher = regexp.MustCompile(`^projects/[a-z][a-z0-9\-]*/topics/`) @@ -34,6 +35,8 @@ type Config struct { Compression string `mapstructure:"compression"` // Watermark defines the watermark (the ce-time attribute on the message) behavior Watermark WatermarkConfig `mapstructure:"watermark"` + // Ordering configures the ordering keys + Ordering OrderingConfig `mapstructure:"ordering"` } // WatermarkConfig customizes the behavior of the watermark @@ -46,15 +49,27 @@ type WatermarkConfig struct { AllowedDrift time.Duration `mapstructure:"allowed_drift"` } +// OrderingConfig customizes the behavior of the ordering +type OrderingConfig struct { + // Enabled indicates if ordering is enabled + Enabled bool `mapstructure:"enabled"` + // FromResourceAttribute is a resource attribute that will be used as the ordering key. + FromResourceAttribute string `mapstructure:"from_resource_attribute"` + // RemoveResourceAttribute indicates if the ordering key should be removed from the resource attributes. + RemoveResourceAttribute bool `mapstructure:"remove_resource_attribute"` +} + func (config *Config) Validate() error { + var errors error if !topicMatcher.MatchString(config.Topic) { - return fmt.Errorf("topic '%s' is not a valid format, use 'projects//topics/'", config.Topic) + errors = multierr.Append(errors, fmt.Errorf("topic '%s' is not a valid format, use 'projects//topics/'", config.Topic)) } - _, err := config.parseCompression() - if err != nil { - return err + if _, err := config.parseCompression(); err != nil { + errors = multierr.Append(errors, err) } - return config.Watermark.validate() + errors = multierr.Append(errors, config.Watermark.validate()) + errors = multierr.Append(errors, config.Ordering.validate()) + return errors } func (config *WatermarkConfig) validate() error { @@ -65,6 +80,13 @@ func (config *WatermarkConfig) validate() error { return err } +func (cfg *OrderingConfig) validate() error { + if cfg.Enabled && cfg.FromResourceAttribute == "" { + return fmt.Errorf("'from_resource_attribute' is required if ordering is enabled") + } + return nil +} + func (config *Config) parseCompression() (compression, error) { switch config.Compression { case "gzip": diff --git a/exporter/googlecloudpubsubexporter/config_test.go b/exporter/googlecloudpubsubexporter/config_test.go index 0a7943d000db3..dbd7cad265f78 100644 --- a/exporter/googlecloudpubsubexporter/config_test.go +++ b/exporter/googlecloudpubsubexporter/config_test.go @@ -34,18 +34,21 @@ func TestLoadConfig(t *testing.T) { require.NoError(t, err) require.NoError(t, sub.Unmarshal(cfg)) - customConfig := factory.CreateDefaultConfig().(*Config) + expectedConfig := factory.CreateDefaultConfig().(*Config) - customConfig.ProjectID = "my-project" - customConfig.UserAgent = "opentelemetry-collector-contrib {{version}}" - customConfig.TimeoutSettings = exporterhelper.TimeoutConfig{ + expectedConfig.ProjectID = "my-project" + expectedConfig.UserAgent = "opentelemetry-collector-contrib {{version}}" + expectedConfig.TimeoutSettings = exporterhelper.TimeoutConfig{ Timeout: 20 * time.Second, } - customConfig.Topic = "projects/my-project/topics/otlp-topic" - customConfig.Compression = "gzip" - customConfig.Watermark.Behavior = "earliest" - customConfig.Watermark.AllowedDrift = time.Hour - assert.Equal(t, cfg, customConfig) + expectedConfig.Topic = "projects/my-project/topics/otlp-topic" + expectedConfig.Compression = "gzip" + expectedConfig.Watermark.Behavior = "earliest" + expectedConfig.Watermark.AllowedDrift = time.Hour + expectedConfig.Ordering.Enabled = true + expectedConfig.Ordering.FromResourceAttribute = "ordering_key" + expectedConfig.Ordering.RemoveResourceAttribute = true + assert.Equal(t, expectedConfig, cfg) } func TestTopicConfigValidation(t *testing.T) { @@ -100,3 +103,14 @@ func TestWatermarkDefaultMaxDriftValidation(t *testing.T) { assert.NoError(t, c.Validate()) assert.Equal(t, time.Duration(9223372036854775807), c.Watermark.AllowedDrift) } + +func TestOrderConfigValidation(t *testing.T) { + factory := NewFactory() + c := factory.CreateDefaultConfig().(*Config) + c.Topic = "projects/project/topics/my-topic" + assert.NoError(t, c.Validate()) + c.Ordering.Enabled = true + assert.Error(t, c.Validate()) + c.Ordering.FromResourceAttribute = "key" + assert.NoError(t, c.Validate()) +} diff --git a/exporter/googlecloudpubsubexporter/factory.go b/exporter/googlecloudpubsubexporter/factory.go index b60be1b0a8bc6..ec723119cdfa6 100644 --- a/exporter/googlecloudpubsubexporter/factory.go +++ b/exporter/googlecloudpubsubexporter/factory.go @@ -81,6 +81,11 @@ func createDefaultConfig() component.Config { Behavior: "current", AllowedDrift: 0, }, + Ordering: OrderingConfig{ + Enabled: false, + FromResourceAttribute: "", + RemoveResourceAttribute: false, + }, } } diff --git a/exporter/googlecloudpubsubexporter/testdata/config.yaml b/exporter/googlecloudpubsubexporter/testdata/config.yaml index 63e43036e294d..765192434f933 100644 --- a/exporter/googlecloudpubsubexporter/testdata/config.yaml +++ b/exporter/googlecloudpubsubexporter/testdata/config.yaml @@ -8,3 +8,7 @@ googlecloudpubsub/customname: watermark: behavior: earliest allowed_drift: 1h + ordering: + enabled: true + from_resource_attribute: ordering_key + remove_resource_attribute: true