Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Azure Event Hubs Entra SASL Mechanism #1338

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions sasl/azure_event_hubs_entra/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Azure Event Hubs Entra

Provides support for [Azure Event Hub with Kafka Protocol](https://learn.microsoft.com/en-us/azure/event-hubs/azure-event-hubs-kafka-overview),
using Azure Entra for authentication.

## How to use
This module is separate from the `kafka-go` module, since it is only required
for Event Hub users.

You can add this module to your dependencies by running the command below:
```shell
go get github.com/segmentio/kafka-go/sasl/azure_event_hubs_entra
```

To connect to Event Hub with Kafka protocol:
```go
package main

import (
"context"
"crypto/tls"
"fmt"
"os"

"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/azure_event_hubs_entra"
)

func main() {
// Create Azure Entra Default Credentials
cred, err := azidentity.NewDefaultAzureCredential(nil)

if err != nil {
fmt.Printf("failed to create Default Azure Credential: %s", err.Error())
os.Exit(1)
}

// Create Azure Entra SASL Mechanism
entraMechanism := azure_event_hubs_entra.NewMechanism(cred)

// Reader
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"<Event Hub Namespace Name>.servicebus.windows.net:9093"},
GroupID: "<Arbitrary Consumer Group Id>",
Topic: "<Event Hub Name>",
Dialer: &kafka.Dialer{
SASLMechanism: entraMechanism,
TLS: &tls.Config{},
},
})

defer r.Close()

// Writer
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"<Event Hub Namespace Name>.servicebus.windows.net:9093"},
Topic: "<Event Hub Name>",
Dialer: &kafka.Dialer{
SASLMechanism: entraMechanism,
TLS: &tls.Config{},
},
})

defer w.Close()

err = w.WriteMessages(context.Background(), kafka.Message{
Value: []byte("test"),
})

if err != nil {
fmt.Printf("failed to write message: %s", err.Error())
os.Exit(2)
}

message, err := r.ReadMessage(context.Background())

if err != nil {
fmt.Printf("failed to read message: %s", err.Error())
os.Exit(3)
}

fmt.Printf("received message: %s", string(message.Value))
}

```
71 changes: 71 additions & 0 deletions sasl/azure_event_hubs_entra/azure_event_hubs_entra.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package azure_event_hubs_entra

import (
"context"
"errors"
"fmt"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/segmentio/kafka-go/sasl"
)

type Mechanism struct {
tokenCredential azcore.TokenCredential
}

func (*Mechanism) Name() string {
return "OAUTHBEARER"
}

func (m *Mechanism) Start(ctx context.Context) (sasl.StateMachine, []byte, error) {
saslMeta := sasl.MetadataFromContext(ctx)

if saslMeta == nil {
return nil, nil, errors.New("missing sasl metadata")
}

entraToken, err := m.getEntraToken(ctx, saslMeta)

if err != nil {
return nil, nil, err
}

// See https://datatracker.ietf.org/doc/html/rfc7628
saslResponse := fmt.Sprintf("n,,\x01auth=Bearer %s\x01\x01", entraToken.Token)

return m, []byte(saslResponse), nil
}

func (m *Mechanism) getEntraToken(ctx context.Context, saslMeta *sasl.Metadata) (azcore.AccessToken, error) {
tokenRequestOptions := buildTokenRequestOptions(saslMeta)

entraToken, err := m.tokenCredential.GetToken(ctx, tokenRequestOptions)

if err == nil {
return entraToken, nil
} else {
err := fmt.Errorf("failed to request an Azure Entra Token: %w", err)
return entraToken, err
}

}

func buildTokenRequestOptions(saslMeta *sasl.Metadata) policy.TokenRequestOptions {
tokenRequestOptions := policy.TokenRequestOptions{
Scopes: []string{"https://" + saslMeta.Host + "/.default"},
EnableCAE: false,
}

return tokenRequestOptions
}

func (m *Mechanism) Next(ctx context.Context, challenge []byte) (done bool, response []byte, err error) {
return true, nil, nil
}

func NewMechanism(tokenCredential azcore.TokenCredential) *Mechanism {
return &Mechanism{
tokenCredential: tokenCredential,
}
}
146 changes: 146 additions & 0 deletions sasl/azure_event_hubs_entra/azure_event_hubs_entra_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package azure_event_hubs_entra

import (
"context"
"errors"
"fmt"
"strings"
"testing"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/segmentio/kafka-go/sasl"
)

type MockTokenCredential struct {
getTokenFunc func() (string, error)
}

func (c *MockTokenCredential) GetToken(ctx context.Context, options policy.TokenRequestOptions) (azcore.AccessToken, error) {
if len(options.Scopes) != 1 {
return azcore.AccessToken{}, fmt.Errorf("Scopes must contain 1 element! Contains %d elements.", len(options.Scopes))
}

scope := options.Scopes[0]

if !strings.HasPrefix(scope, "https://") {
return azcore.AccessToken{}, fmt.Errorf("Scope must start with https, and it did not.")
}

if !strings.HasSuffix(scope, "/.default") {
return azcore.AccessToken{}, fmt.Errorf("Scope must end with /.default, and it did not.")
}

if options.EnableCAE {
return azcore.AccessToken{}, fmt.Errorf("CAE must be false. It was true.")
}

token, err := c.getTokenFunc()

if err != nil {
return azcore.AccessToken{}, err
}

return azcore.AccessToken{Token: token}, nil
}

func TestName(t *testing.T) {
mechanism := NewMechanism(&MockTokenCredential{
getTokenFunc: func() (string, error) { return "testtoken", nil },
})

expected := "OAUTHBEARER"
actual := mechanism.Name()

if expected != actual {
t.Fatalf("Expected: %s - Actual: %s", expected, actual)
}
}

func TestStart(t *testing.T) {
mechanism := NewMechanism(&MockTokenCredential{
getTokenFunc: func() (string, error) { return "testtoken", nil },
})

ctx := sasl.WithMetadata(context.Background(), &sasl.Metadata{
Host: "test.servicebus.windows.net",
Port: 9093,
})

stateMachine, saslBytes, err := mechanism.Start(ctx)

if stateMachine == nil {
t.Fatalf("Expected stateMachine to be non-nil")
}

expectedSaslData := "n,,\x01auth=Bearer testtoken\x01\x01"

if string(saslBytes) != expectedSaslData {
t.Fatalf("expected saslData to be %s. Received %s.", expectedSaslData, string(saslBytes))
}

if err != nil {
t.Fatalf("expected err to be nil")
}
}

func TestStartNoMetadata(t *testing.T) {
mechanism := NewMechanism(&MockTokenCredential{
getTokenFunc: func() (string, error) { return "testtoken", nil },
})

ctx := context.Background()

stateMachine, saslBytes, err := mechanism.Start(ctx)

assertStartError(stateMachine, t, saslBytes, err, "missing sasl metadata")
}

func TestStartTokenError(t *testing.T) {
mechanism := NewMechanism(&MockTokenCredential{
getTokenFunc: func() (string, error) { return "", errors.New("Failed to acquire token") },
})

ctx := sasl.WithMetadata(context.Background(), &sasl.Metadata{
Host: "test.servicebus.windows.net",
Port: 9093,
})

stateMachine, saslBytes, err := mechanism.Start(ctx)

assertStartError(stateMachine, t, saslBytes, err, "failed to request an Azure Entra Token: Failed to acquire token")
}

func assertStartError(stateMachine sasl.StateMachine, t *testing.T, saslBytes []byte, err error, expectedError string) {
if stateMachine != nil {
t.Fatalf("Expected stateMachine to be nil")
}

if saslBytes != nil {
t.Fatalf("Expected saslBytes to be nil")
}

if err.Error() != expectedError {
t.Fatalf("expected err to be %s. was %s", expectedError, err.Error())
}
}

func TestNext(t *testing.T) {
mechanism := NewMechanism(&MockTokenCredential{
getTokenFunc: func() (string, error) { return "testtoken", nil },
})

done, response, err := mechanism.Next(context.Background(), []byte("challenge"))

if !done {
t.Fatalf("Expected done to be true")
}

if response != nil {
t.Fatalf("Expected nil response")
}

if err != nil {
t.Fatalf("Expected nil error")
}
}
9 changes: 9 additions & 0 deletions sasl/azure_event_hubs_entra/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
module github.com/segmentio/kafka-go/sasl/azure_event_hubs_entra

go 1.15

require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.14.0
github.com/segmentio/kafka-go v0.4.47
golang.org/x/net v0.28.0 // indirect
)
Loading