Skip to content

Commit

Permalink
feat: add Delegation Token APIs to ClusterAdmin
Browse files Browse the repository at this point in the history
This commit adds the KIP-48 APIs regarding token delegation to `ClusterAdmin`.

These APIs have been available on Brokers since v1.1 and the Java
AdminClient has had these facilities since v2.0.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-48+Delegation+token+support+for+Kafka

Signed-off-by: Chris <[email protected]>
  • Loading branch information
Chrisss93 committed Nov 29, 2023
1 parent 3e385a6 commit fc430c1
Show file tree
Hide file tree
Showing 22 changed files with 1,565 additions and 4 deletions.
9 changes: 9 additions & 0 deletions acl_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ const (
AclOperationDescribeConfigs
AclOperationAlterConfigs
AclOperationIdempotentWrite
AclOperationCreateTokens
AclOperationDescribeTokens
)

func (a *AclOperation) String() string {
Expand All @@ -47,6 +49,8 @@ func (a *AclOperation) String() string {
AclOperationDescribeConfigs: "DescribeConfigs",
AclOperationAlterConfigs: "AlterConfigs",
AclOperationIdempotentWrite: "IdempotentWrite",
AclOperationCreateTokens: "CreateTokens",
AclOperationDescribeTokens: "DescribeTokens",
}
s, ok := mapping[*a]
if !ok {
Expand Down Expand Up @@ -77,6 +81,8 @@ func (a *AclOperation) UnmarshalText(text []byte) error {
"describeconfigs": AclOperationDescribeConfigs,
"alterconfigs": AclOperationAlterConfigs,
"idempotentwrite": AclOperationIdempotentWrite,
"createtokens": AclOperationCreateTokens,
"describetokens": AclOperationDescribeTokens,
}
ao, ok := mapping[normalized]
if !ok {
Expand Down Expand Up @@ -142,6 +148,7 @@ const (
AclResourceCluster
AclResourceTransactionalID
AclResourceDelegationToken
AclResourceUser
)

func (a *AclResourceType) String() string {
Expand All @@ -153,6 +160,7 @@ func (a *AclResourceType) String() string {
AclResourceCluster: "Cluster",
AclResourceTransactionalID: "TransactionalID",
AclResourceDelegationToken: "DelegationToken",
AclResourceUser: "User",
}
s, ok := mapping[*a]
if !ok {
Expand All @@ -177,6 +185,7 @@ func (a *AclResourceType) UnmarshalText(text []byte) error {
"cluster": AclResourceCluster,
"transactionalid": AclResourceTransactionalID,
"delegationtoken": AclResourceDelegationToken,
"user": AclResourceUser,
}

art, ok := mapping[normalized]
Expand Down
119 changes: 119 additions & 0 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,22 @@ type ClusterAdmin interface {
// This is for static membership feature. KIP-345
RemoveMemberFromConsumerGroup(groupId string, groupInstanceIds []string) (*LeaveGroupResponse, error)

// Creates a renewable delegation token based on the access privileges of the token's owner. If a nil argument
// is provided, the owner is assumed to be the admin client user. The renewer argument allows users other
// than the token's owner to renew/expire the token. When the maxLifetime argument is -1 milliseconds,
// the server default will be used.
CreateDelegationToken(renewers []string, owner *string, maxLifetime time.Duration) (*DelegationToken, error)

// Renews an existing delegation token (identified by its hmac), returning the new expiration time.
RenewDelegationToken(hmac []byte, period time.Duration) (time.Time, error)

// Changes the expiration time for an existing delegation token (identified by its hmac), returning the new
// expiration time. Set the period to -1 milliseconds to immediately expire a token.
ExpireDelegationToken(hmac []byte, period time.Duration) (time.Time, error)

// Returns the information for all delegation tokens owned by the users supplied in the argument.
DescribeDelegationToken(owners []string) ([]RenewableToken, error)

// Close shuts down the admin and closes underlying client.
Close() error
}
Expand Down Expand Up @@ -1269,3 +1285,106 @@ func (ca *clusterAdmin) RemoveMemberFromConsumerGroup(groupId string, groupInsta
}
return controller.LeaveGroup(request)
}

func (ca *clusterAdmin) CreateDelegationToken(renewers []string, owner *string, maxLifetime time.Duration) (*DelegationToken, error) {

Check failure on line 1289 in admin.go

View workflow job for this annotation

GitHub Actions / Linting with Go 1.22.x

unnecessary leading newline (whitespace)

controller, err := ca.client.Controller()
if err != nil {
return nil, err
}

resource := AclResourceUser
request := &CreateDelegationTokenRequest{
Version: 2,
MaxLifetime: maxLifetime,
Renewers: make([]Principal, len(renewers)),
}

if ca.conf.Version.IsAtLeast(V3_3_0_0) {
request.Version = 3
if owner != nil && len(*owner) > 0 {
user := resource.String()
request.OwnerPrincipalType = &user
request.OwnerName = owner
}
}

for i, r := range renewers {
request.Renewers[i] = Principal{resource.String(), r}
}

rsp, err := controller.CreateDelegationToken(request)
if err != nil {
return nil, err
}
if !errors.Is(rsp.ErrorCode, ErrNoError) {
return nil, rsp.ErrorCode
}

return &rsp.DelegationToken, nil
}

func (ca *clusterAdmin) RenewDelegationToken(hmac []byte, period time.Duration) (time.Time, error) {
var expiry time.Time
controller, err := ca.client.Controller()
if err != nil {
return expiry, err
}

request := RenewDelegationTokenRequest{Version: 2, HMAC: hmac, RenewalPeriod: period}
rsp, err := controller.RenewDelegationToken(&request)
if err != nil {
return expiry, err
}
if !errors.Is(rsp.ErrorCode, ErrNoError) {
return expiry, rsp.ErrorCode
}

return rsp.ExpiryTime, nil
}

func (ca *clusterAdmin) ExpireDelegationToken(hmac []byte, period time.Duration) (time.Time, error) {
var expiry time.Time

controller, err := ca.client.Controller()
if err != nil {
return expiry, err
}

request := ExpireDelegationTokenRequest{Version: 2, HMAC: hmac, ExpiryPeriod: period}
rsp, err := controller.ExpireDelegationToken(&request)
if err != nil {
return expiry, err
}
if !errors.Is(rsp.ErrorCode, ErrNoError) {
return expiry, rsp.ErrorCode
}

return rsp.ExpiryTime, nil
}

func (ca *clusterAdmin) DescribeDelegationToken(owners []string) ([]RenewableToken, error) {
var tokens []RenewableToken

controller, err := ca.client.Controller()
if err != nil {
return tokens, err
}

resource := AclResourceUser
principals := make([]Principal, len(owners))
for i, x := range owners {
principals[i] = Principal{resource.String(), x}
}

request := DescribeDelegationTokenRequest{Version: 2, Owners: principals}
rsp, err := controller.DescribeDelegationToken(&request)
if err != nil {
return tokens, err
}
if !errors.Is(rsp.ErrorCode, ErrNoError) {
return tokens, rsp.ErrorCode
}

return rsp.Tokens, nil
}
48 changes: 48 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,54 @@ func (b *Broker) AlterClientQuotas(request *AlterClientQuotasRequest) (*AlterCli
return response, nil
}

func (b *Broker) CreateDelegationToken(request *CreateDelegationTokenRequest) (*CreateDelegationTokenResponse, error) {
response := new(CreateDelegationTokenResponse)
response.Version = request.version()

err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}

return response, nil
}

func (b *Broker) RenewDelegationToken(request *RenewDelegationTokenRequest) (*RenewDelegationTokenResponse, error) {
response := new(RenewDelegationTokenResponse)
response.Version = request.version()

err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}

return response, nil
}

func (b *Broker) ExpireDelegationToken(request *ExpireDelegationTokenRequest) (*ExpireDelegationTokenResponse, error) {
response := new(ExpireDelegationTokenResponse)
response.Version = request.version()

err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}

return response, nil
}

func (b *Broker) DescribeDelegationToken(request *DescribeDelegationTokenRequest) (*DescribeDelegationTokenResponse, error) {
response := new(DescribeDelegationTokenResponse)
response.Version = request.version()

err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}

return response, nil
}

// readFull ensures the conn ReadDeadline has been setup before making a
// call to io.ReadFull
func (b *Broker) readFull(buf []byte) (n int, err error) {
Expand Down
118 changes: 118 additions & 0 deletions delegation_token_create_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package sarama

import "time"

type CreateDelegationTokenRequest struct {
Version int16
OwnerPrincipalType *string
OwnerName *string
Renewers []Principal
MaxLifetime time.Duration
}

func (c *CreateDelegationTokenRequest) encode(pe packetEncoder) (err error) {
if c.Version > 2 {
if err = pe.putNullableCompactString(c.OwnerPrincipalType); err != nil {
return err
}
if err = pe.putNullableCompactString(c.OwnerName); err != nil {
return err
}
}

if c.Version > 1 {
pe.putCompactArrayLength(len(c.Renewers))
} else if err = pe.putArrayLength(len(c.Renewers)); err != nil {
return err
}

for _, r := range c.Renewers {
if err = r.encode(pe, c.Version); err != nil {
return err
}
if c.Version > 1 {
pe.putEmptyTaggedFieldArray()
}
}

pe.putInt64(c.MaxLifetime.Milliseconds())

if c.Version > 1 {
pe.putEmptyTaggedFieldArray()
}

return nil
}

func (c *CreateDelegationTokenRequest) decode(pd packetDecoder, version int16) (err error) {
c.Version = version

if version > 2 {
if c.OwnerPrincipalType, err = pd.getCompactNullableString(); err != nil {
return err
}
if c.OwnerName, err = pd.getCompactNullableString(); err != nil {
return err
}
}

var n int
if version > 1 {
n, err = pd.getCompactArrayLength()
} else {
n, err = pd.getArrayLength()
}
if err != nil {
return err
}
c.Renewers = make([]Principal, n)
for i := range c.Renewers {
if err := c.Renewers[i].decode(pd, version); err != nil {
return err
}
if version > 1 {
if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
return err
}
}
}

var ms int64
if ms, err = pd.getInt64(); err == nil {
c.MaxLifetime = time.Duration(ms) * time.Millisecond
}

if version > 1 && err == nil {
_, err = pd.getEmptyTaggedFieldArray()
}

return err
}

func (c *CreateDelegationTokenRequest) key() int16 {
return 38
}

func (c *CreateDelegationTokenRequest) version() int16 {
return c.Version
}

func (c *CreateDelegationTokenRequest) headerVersion() int16 {
if c.Version > 1 {
return 2
}
return 1
}

func (c *CreateDelegationTokenRequest) isValidVersion() bool {
return c.Version >= 0 && c.Version <= 3
}

func (c *CreateDelegationTokenRequest) requiredVersion() KafkaVersion {
switch c.Version {
case 3:
return V3_3_0_0
default:
return V1_1_0_0
}
}
Loading

0 comments on commit fc430c1

Please sign in to comment.