Skip to content

Commit

Permalink
Add kafka topics observer implementation (#38060)
Browse files Browse the repository at this point in the history
#### Description
Add implementation and tests for kafka topics observer

#### Link to tracking issue
New component
#37665 

#### Testing
Unit tests
  • Loading branch information
wojtekzyla authored Feb 24, 2025
1 parent 2039671 commit ad684d6
Show file tree
Hide file tree
Showing 16 changed files with 501 additions and 72 deletions.
27 changes: 27 additions & 0 deletions .chloggen/feat_kafka-topic-observer-impl.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: new_component

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: kafkatopicsobserver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Adding implementation and tests of the component's logic."

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [37665]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
13 changes: 13 additions & 0 deletions extension/observer/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ const (
HostPortType EndpointType = "hostport"
// ContainerType is a container endpoint.
ContainerType EndpointType = "container"
// KafkaTopicType is a kafka topic endpoint
KafkaTopicType EndpointType = "kafka.topics"
)

var (
Expand All @@ -45,6 +47,7 @@ var (
_ EndpointDetails = (*K8sNode)(nil)
_ EndpointDetails = (*HostPort)(nil)
_ EndpointDetails = (*Container)(nil)
_ EndpointDetails = (*KafkaTopic)(nil)
)

// EndpointDetails provides additional context about an endpoint such as a Pod or Port.
Expand Down Expand Up @@ -387,3 +390,13 @@ func (n *K8sNode) Env() EndpointEnv {
func (n *K8sNode) Type() EndpointType {
return K8sNodeType
}

type KafkaTopic struct{}

func (k *KafkaTopic) Env() EndpointEnv {
return map[string]any{}
}

func (k *KafkaTopic) Type() EndpointType {
return KafkaTopicType
}
14 changes: 14 additions & 0 deletions extension/observer/endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,20 @@ func TestEndpointEnv(t *testing.T) {
"host": "192.68.73.2",
},
},
{
name: "Kafka topic",
endpoint: Endpoint{
ID: EndpointID("topic1"),
Target: "topic1",
Details: &KafkaTopic{},
},
want: EndpointEnv{
"id": "topic1",
"type": "kafka.topics",
"host": "topic1",
"endpoint": "topic1",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
7 changes: 3 additions & 4 deletions extension/observer/kafkatopicsobserver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@
<!-- status autogenerated section -->
| Status | |
| ------------- |-----------|
| Stability | [beta] |
| Distributions | [contrib] |
| Stability | [development] |
| Distributions | [] |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aextension%2Fkafkatopicsobserver%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aextension%2Fkafkatopicsobserver) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aextension%2Fkafkatopicsobserver%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aextension%2Fkafkatopicsobserver) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@MovieStoreGuy](https://www.github.com/MovieStoreGuy) |

[beta]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#beta
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
[development]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#development
<!-- end autogenerated section -->

The Kafka topics observer extension is a [Receiver Creator](../../../receiver/receivercreator/README.md)-compatible "watch observer" that will detect and report
Expand Down
32 changes: 27 additions & 5 deletions extension/observer/kafkatopicsobserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
package kafkatopicsobserver // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/kafkatopicsobserver"

import (
"fmt"
"time"

"go.uber.org/multierr"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka"
)

Expand All @@ -23,11 +26,30 @@ type Config struct {
// Session interval for the Kafka consumer
SessionTimeout time.Duration `mapstructure:"session_timeout"`
// Heartbeat interval for the Kafka consumer
HeartbeatInterval time.Duration `mapstructure:"heartbeat_interval"`
Authentication kafka.Authentication `mapstructure:"auth"`
TopicRegex string `mapstructure:"topic_regex"`
HeartbeatInterval time.Duration `mapstructure:"heartbeat_interval"`
Authentication kafka.Authentication `mapstructure:"auth"`
TopicRegex string `mapstructure:"topic_regex"`
TopicsSyncInterval time.Duration `mapstructure:"topics_sync_interval"`
}

func (config Config) Validate() error {
return nil
func (config *Config) Validate() (errs error) {
if len(config.Brokers) == 0 {
errs = multierr.Append(errs, fmt.Errorf("brokers list must be specified"))
}
if len(config.ProtocolVersion) == 0 {
errs = multierr.Append(errs, fmt.Errorf("protocol_version must be specified"))
}
if len(config.TopicRegex) == 0 {
errs = multierr.Append(errs, fmt.Errorf("topic_regex must be specified"))
}
if config.TopicsSyncInterval <= 0 {
errs = multierr.Append(errs, fmt.Errorf("topics_sync_interval must be greater than 0"))
}
if config.SessionTimeout <= 0 {
errs = multierr.Append(errs, fmt.Errorf("session_timeout must be greater than 0"))
}
if config.HeartbeatInterval <= 0 {
errs = multierr.Append(errs, fmt.Errorf("heartbeat_interval must be greater than 0"))
}
return errs
}
150 changes: 150 additions & 0 deletions extension/observer/kafkatopicsobserver/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package kafkatopicsobserver

import (
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
"go.opentelemetry.io/collector/confmap/confmaptest"
"go.opentelemetry.io/collector/confmap/xconfmap"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/kafkatopicsobserver/internal/metadata"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka"
)

func TestLoadConfig(t *testing.T) {
t.Parallel()

tests := []struct {
id component.ID
expected component.Config
expectedError string
}{
{
id: component.NewID(metadata.Type),
expected: NewFactory().CreateDefaultConfig(),
expectedError: "protocol_version must be specified; topic_regex must be specified",
},
{
id: component.NewIDWithName(metadata.Type, "all_settings"),
expected: &Config{
ProtocolVersion: "3.7.0",
Brokers: []string{"1.2.3.4:9092", "2.3.4.5:9092"},
TopicRegex: "^topic[0-9]$",
TopicsSyncInterval: 5 * time.Second,
ResolveCanonicalBootstrapServersOnly: false,
SessionTimeout: 30 * time.Second,
HeartbeatInterval: 20 * time.Second,
Authentication: kafka.Authentication{
PlainText: &kafka.PlainTextConfig{
Username: "fooUser",
Password: "fooPassword",
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.id.String(), func(t *testing.T) {
cfg := loadConfig(t, tt.id)
if tt.expectedError != "" {
assert.EqualError(t, xconfmap.Validate(cfg), tt.expectedError)
} else {
assert.NoError(t, xconfmap.Validate(cfg))
}
assert.Equal(t, tt.expected, cfg)
})
}
}

func TestValidateConfig(t *testing.T) {
cfg := &Config{
Brokers: []string{},
ProtocolVersion: "3.7.0",
TopicRegex: "^test[0-9]$",
}
assert.Equal(t, "brokers list must be specified; topics_sync_interval must be greater than 0; session_timeout must be greater than 0; heartbeat_interval must be greater than 0", xconfmap.Validate(cfg).Error())

cfg = &Config{
Brokers: []string{"1.2.3.4:9092"},
ProtocolVersion: "",
TopicRegex: "^topic[0-9]$",
TopicsSyncInterval: 1 * time.Second,
SessionTimeout: 1 * time.Second,
HeartbeatInterval: 1 * time.Second,
}
assert.Equal(t, "protocol_version must be specified", xconfmap.Validate(cfg).Error())

cfg = &Config{
Brokers: []string{"1.2.3.4:9092"},
ProtocolVersion: "3.7.0",
TopicRegex: "",
TopicsSyncInterval: 1 * time.Second,
SessionTimeout: 1 * time.Second,
HeartbeatInterval: 1 * time.Second,
}
assert.Equal(t, "topic_regex must be specified", xconfmap.Validate(cfg).Error())

cfg = &Config{
Brokers: []string{"1.2.3.4:9092"},
ProtocolVersion: "3.7.0",
TopicRegex: "^topic[0-9]$",
TopicsSyncInterval: 0 * time.Second,
SessionTimeout: 1 * time.Second,
HeartbeatInterval: 1 * time.Second,
}
assert.Equal(t, "topics_sync_interval must be greater than 0", xconfmap.Validate(cfg).Error())

cfg = &Config{
Brokers: []string{"1.2.3.4:9092"},
ProtocolVersion: "3.7.0",
TopicRegex: "^topic[0-9]$",
TopicsSyncInterval: 1 * time.Second,
SessionTimeout: 0 * time.Second,
HeartbeatInterval: 1 * time.Second,
}
assert.Equal(t, "session_timeout must be greater than 0", xconfmap.Validate(cfg).Error())

cfg = &Config{
Brokers: []string{"1.2.3.4:9092"},
ProtocolVersion: "3.7.0",
TopicRegex: "^topic[0-9]$",
TopicsSyncInterval: 1 * time.Second,
SessionTimeout: 1 * time.Second,
HeartbeatInterval: 0 * time.Second,
}
assert.Equal(t, "heartbeat_interval must be greater than 0", xconfmap.Validate(cfg).Error())

cfg = &Config{
Brokers: []string{"1.2.3.4:9092"},
ProtocolVersion: "3.7.0",
TopicRegex: "^topic[0-9]$",
TopicsSyncInterval: 1 * time.Second,
SessionTimeout: 1 * time.Second,
HeartbeatInterval: 1 * time.Second,
}
assert.NoError(t, xconfmap.Validate(cfg))
}

func loadConf(tb testing.TB, path string, id component.ID) *confmap.Conf {
cm, err := confmaptest.LoadConf(filepath.Join("testdata", path))
require.NoError(tb, err)
sub, err := cm.Sub(id.String())
require.NoError(tb, err)
return sub
}

func loadConfig(tb testing.TB, id component.ID) *Config {
factory := NewFactory()
cfg := factory.CreateDefaultConfig()
sub := loadConf(tb, "config.yaml", id)
require.NoError(tb, sub.Unmarshal(cfg))
return cfg.(*Config)
}
Loading

0 comments on commit ad684d6

Please sign in to comment.