From fc430c19ff20ba563d268452e1faeea8bd228c98 Mon Sep 17 00:00:00 2001 From: Chris Date: Tue, 28 Nov 2023 19:48:32 -0700 Subject: [PATCH] feat: add Delegation Token APIs to ClusterAdmin This commit adds the KIP-48 APIs regarding token delegation to `ClusterAdmin`. These APIs have been available on Brokers since v1.1 and the Java AdminClient has had these facilities since v2.0. https://cwiki.apache.org/confluence/display/KAFKA/KIP-48+Delegation+token+support+for+Kafka Signed-off-by: Chris --- acl_types.go | 9 ++ admin.go | 119 ++++++++++++++++ broker.go | 48 +++++++ delegation_token_create_request.go | 118 +++++++++++++++ delegation_token_create_request_test.go | 73 ++++++++++ delegation_token_create_response.go | 81 +++++++++++ delegation_token_create_response_test.go | 90 ++++++++++++ delegation_token_describe_request.go | 84 +++++++++++ delegation_token_describe_request_test.go | 51 +++++++ delegation_token_describe_response.go | 144 +++++++++++++++++++ delegation_token_describe_response_test.go | 158 +++++++++++++++++++++ delegation_token_expire_request.go | 76 ++++++++++ delegation_token_expire_request_test.go | 40 ++++++ delegation_token_expire_response.go | 72 ++++++++++ delegation_token_expire_response_test.go | 42 ++++++ delegation_token_renew_request.go | 76 ++++++++++ delegation_token_renew_request_test.go | 40 ++++++ delegation_token_renew_response.go | 72 ++++++++++ delegation_token_renew_response_test.go | 42 ++++++ delegation_token_types.go | 114 +++++++++++++++ request.go | 12 +- request_test.go | 8 ++ 22 files changed, 1565 insertions(+), 4 deletions(-) create mode 100644 delegation_token_create_request.go create mode 100644 delegation_token_create_request_test.go create mode 100644 delegation_token_create_response.go create mode 100644 delegation_token_create_response_test.go create mode 100644 delegation_token_describe_request.go create mode 100644 delegation_token_describe_request_test.go create mode 100644 delegation_token_describe_response.go create mode 100644 delegation_token_describe_response_test.go create mode 100644 delegation_token_expire_request.go create mode 100644 delegation_token_expire_request_test.go create mode 100644 delegation_token_expire_response.go create mode 100644 delegation_token_expire_response_test.go create mode 100644 delegation_token_renew_request.go create mode 100644 delegation_token_renew_request_test.go create mode 100644 delegation_token_renew_response.go create mode 100644 delegation_token_renew_response_test.go create mode 100644 delegation_token_types.go diff --git a/acl_types.go b/acl_types.go index 62bb5342a..ef31ce193 100644 --- a/acl_types.go +++ b/acl_types.go @@ -30,6 +30,8 @@ const ( AclOperationDescribeConfigs AclOperationAlterConfigs AclOperationIdempotentWrite + AclOperationCreateTokens + AclOperationDescribeTokens ) func (a *AclOperation) String() string { @@ -47,6 +49,8 @@ func (a *AclOperation) String() string { AclOperationDescribeConfigs: "DescribeConfigs", AclOperationAlterConfigs: "AlterConfigs", AclOperationIdempotentWrite: "IdempotentWrite", + AclOperationCreateTokens: "CreateTokens", + AclOperationDescribeTokens: "DescribeTokens", } s, ok := mapping[*a] if !ok { @@ -77,6 +81,8 @@ func (a *AclOperation) UnmarshalText(text []byte) error { "describeconfigs": AclOperationDescribeConfigs, "alterconfigs": AclOperationAlterConfigs, "idempotentwrite": AclOperationIdempotentWrite, + "createtokens": AclOperationCreateTokens, + "describetokens": AclOperationDescribeTokens, } ao, ok := mapping[normalized] if !ok { @@ -142,6 +148,7 @@ const ( AclResourceCluster AclResourceTransactionalID AclResourceDelegationToken + AclResourceUser ) func (a *AclResourceType) String() string { @@ -153,6 +160,7 @@ func (a *AclResourceType) String() string { AclResourceCluster: "Cluster", AclResourceTransactionalID: "TransactionalID", AclResourceDelegationToken: "DelegationToken", + AclResourceUser: "User", } s, ok := mapping[*a] if !ok { @@ -177,6 +185,7 @@ func (a *AclResourceType) UnmarshalText(text []byte) error { "cluster": AclResourceCluster, "transactionalid": AclResourceTransactionalID, "delegationtoken": AclResourceDelegationToken, + "user": AclResourceUser, } art, ok := mapping[normalized] diff --git a/admin.go b/admin.go index dcf1d7659..88b9ee3e0 100644 --- a/admin.go +++ b/admin.go @@ -146,6 +146,22 @@ type ClusterAdmin interface { // This is for static membership feature. KIP-345 RemoveMemberFromConsumerGroup(groupId string, groupInstanceIds []string) (*LeaveGroupResponse, error) + // Creates a renewable delegation token based on the access privileges of the token's owner. If a nil argument + // is provided, the owner is assumed to be the admin client user. The renewer argument allows users other + // than the token's owner to renew/expire the token. When the maxLifetime argument is -1 milliseconds, + // the server default will be used. + CreateDelegationToken(renewers []string, owner *string, maxLifetime time.Duration) (*DelegationToken, error) + + // Renews an existing delegation token (identified by its hmac), returning the new expiration time. + RenewDelegationToken(hmac []byte, period time.Duration) (time.Time, error) + + // Changes the expiration time for an existing delegation token (identified by its hmac), returning the new + // expiration time. Set the period to -1 milliseconds to immediately expire a token. + ExpireDelegationToken(hmac []byte, period time.Duration) (time.Time, error) + + // Returns the information for all delegation tokens owned by the users supplied in the argument. + DescribeDelegationToken(owners []string) ([]RenewableToken, error) + // Close shuts down the admin and closes underlying client. Close() error } @@ -1269,3 +1285,106 @@ func (ca *clusterAdmin) RemoveMemberFromConsumerGroup(groupId string, groupInsta } return controller.LeaveGroup(request) } + +func (ca *clusterAdmin) CreateDelegationToken(renewers []string, owner *string, maxLifetime time.Duration) (*DelegationToken, error) { + + controller, err := ca.client.Controller() + if err != nil { + return nil, err + } + + resource := AclResourceUser + request := &CreateDelegationTokenRequest{ + Version: 2, + MaxLifetime: maxLifetime, + Renewers: make([]Principal, len(renewers)), + } + + if ca.conf.Version.IsAtLeast(V3_3_0_0) { + request.Version = 3 + if owner != nil && len(*owner) > 0 { + user := resource.String() + request.OwnerPrincipalType = &user + request.OwnerName = owner + } + } + + for i, r := range renewers { + request.Renewers[i] = Principal{resource.String(), r} + } + + rsp, err := controller.CreateDelegationToken(request) + if err != nil { + return nil, err + } + if !errors.Is(rsp.ErrorCode, ErrNoError) { + return nil, rsp.ErrorCode + } + + return &rsp.DelegationToken, nil +} + +func (ca *clusterAdmin) RenewDelegationToken(hmac []byte, period time.Duration) (time.Time, error) { + var expiry time.Time + controller, err := ca.client.Controller() + if err != nil { + return expiry, err + } + + request := RenewDelegationTokenRequest{Version: 2, HMAC: hmac, RenewalPeriod: period} + rsp, err := controller.RenewDelegationToken(&request) + if err != nil { + return expiry, err + } + if !errors.Is(rsp.ErrorCode, ErrNoError) { + return expiry, rsp.ErrorCode + } + + return rsp.ExpiryTime, nil +} + +func (ca *clusterAdmin) ExpireDelegationToken(hmac []byte, period time.Duration) (time.Time, error) { + var expiry time.Time + + controller, err := ca.client.Controller() + if err != nil { + return expiry, err + } + + request := ExpireDelegationTokenRequest{Version: 2, HMAC: hmac, ExpiryPeriod: period} + rsp, err := controller.ExpireDelegationToken(&request) + if err != nil { + return expiry, err + } + if !errors.Is(rsp.ErrorCode, ErrNoError) { + return expiry, rsp.ErrorCode + } + + return rsp.ExpiryTime, nil +} + +func (ca *clusterAdmin) DescribeDelegationToken(owners []string) ([]RenewableToken, error) { + var tokens []RenewableToken + + controller, err := ca.client.Controller() + if err != nil { + return tokens, err + } + + resource := AclResourceUser + principals := make([]Principal, len(owners)) + for i, x := range owners { + principals[i] = Principal{resource.String(), x} + } + + request := DescribeDelegationTokenRequest{Version: 2, Owners: principals} + rsp, err := controller.DescribeDelegationToken(&request) + if err != nil { + return tokens, err + } + if !errors.Is(rsp.ErrorCode, ErrNoError) { + return tokens, rsp.ErrorCode + } + + return rsp.Tokens, nil +} diff --git a/broker.go b/broker.go index 268696cf4..a9c70767f 100644 --- a/broker.go +++ b/broker.go @@ -930,6 +930,54 @@ func (b *Broker) AlterClientQuotas(request *AlterClientQuotasRequest) (*AlterCli return response, nil } +func (b *Broker) CreateDelegationToken(request *CreateDelegationTokenRequest) (*CreateDelegationTokenResponse, error) { + response := new(CreateDelegationTokenResponse) + response.Version = request.version() + + err := b.sendAndReceive(request, response) + if err != nil { + return nil, err + } + + return response, nil +} + +func (b *Broker) RenewDelegationToken(request *RenewDelegationTokenRequest) (*RenewDelegationTokenResponse, error) { + response := new(RenewDelegationTokenResponse) + response.Version = request.version() + + err := b.sendAndReceive(request, response) + if err != nil { + return nil, err + } + + return response, nil +} + +func (b *Broker) ExpireDelegationToken(request *ExpireDelegationTokenRequest) (*ExpireDelegationTokenResponse, error) { + response := new(ExpireDelegationTokenResponse) + response.Version = request.version() + + err := b.sendAndReceive(request, response) + if err != nil { + return nil, err + } + + return response, nil +} + +func (b *Broker) DescribeDelegationToken(request *DescribeDelegationTokenRequest) (*DescribeDelegationTokenResponse, error) { + response := new(DescribeDelegationTokenResponse) + response.Version = request.version() + + err := b.sendAndReceive(request, response) + if err != nil { + return nil, err + } + + return response, nil +} + // readFull ensures the conn ReadDeadline has been setup before making a // call to io.ReadFull func (b *Broker) readFull(buf []byte) (n int, err error) { diff --git a/delegation_token_create_request.go b/delegation_token_create_request.go new file mode 100644 index 000000000..03624b880 --- /dev/null +++ b/delegation_token_create_request.go @@ -0,0 +1,118 @@ +package sarama + +import "time" + +type CreateDelegationTokenRequest struct { + Version int16 + OwnerPrincipalType *string + OwnerName *string + Renewers []Principal + MaxLifetime time.Duration +} + +func (c *CreateDelegationTokenRequest) encode(pe packetEncoder) (err error) { + if c.Version > 2 { + if err = pe.putNullableCompactString(c.OwnerPrincipalType); err != nil { + return err + } + if err = pe.putNullableCompactString(c.OwnerName); err != nil { + return err + } + } + + if c.Version > 1 { + pe.putCompactArrayLength(len(c.Renewers)) + } else if err = pe.putArrayLength(len(c.Renewers)); err != nil { + return err + } + + for _, r := range c.Renewers { + if err = r.encode(pe, c.Version); err != nil { + return err + } + if c.Version > 1 { + pe.putEmptyTaggedFieldArray() + } + } + + pe.putInt64(c.MaxLifetime.Milliseconds()) + + if c.Version > 1 { + pe.putEmptyTaggedFieldArray() + } + + return nil +} + +func (c *CreateDelegationTokenRequest) decode(pd packetDecoder, version int16) (err error) { + c.Version = version + + if version > 2 { + if c.OwnerPrincipalType, err = pd.getCompactNullableString(); err != nil { + return err + } + if c.OwnerName, err = pd.getCompactNullableString(); err != nil { + return err + } + } + + var n int + if version > 1 { + n, err = pd.getCompactArrayLength() + } else { + n, err = pd.getArrayLength() + } + if err != nil { + return err + } + c.Renewers = make([]Principal, n) + for i := range c.Renewers { + if err := c.Renewers[i].decode(pd, version); err != nil { + return err + } + if version > 1 { + if _, err := pd.getEmptyTaggedFieldArray(); err != nil { + return err + } + } + } + + var ms int64 + if ms, err = pd.getInt64(); err == nil { + c.MaxLifetime = time.Duration(ms) * time.Millisecond + } + + if version > 1 && err == nil { + _, err = pd.getEmptyTaggedFieldArray() + } + + return err +} + +func (c *CreateDelegationTokenRequest) key() int16 { + return 38 +} + +func (c *CreateDelegationTokenRequest) version() int16 { + return c.Version +} + +func (c *CreateDelegationTokenRequest) headerVersion() int16 { + if c.Version > 1 { + return 2 + } + return 1 +} + +func (c *CreateDelegationTokenRequest) isValidVersion() bool { + return c.Version >= 0 && c.Version <= 3 +} + +func (c *CreateDelegationTokenRequest) requiredVersion() KafkaVersion { + switch c.Version { + case 3: + return V3_3_0_0 + default: + return V1_1_0_0 + } +} diff --git a/delegation_token_create_request_test.go b/delegation_token_create_request_test.go new file mode 100644 index 000000000..3f0bbbf1f --- /dev/null +++ b/delegation_token_create_request_test.go @@ -0,0 +1,73 @@ +package sarama + +import ( + "testing" + "time" +) + +var ( + createDelegationTokenRequestV0 = []byte{ + 0, 0, 0, 0, // Renewers + 255, 255, 255, 255, 255, 255, 255, 255, // Max life time + } + + createDelegationTokenRequestV1 = []byte{ + 0, 0, 0, 2, + 0, 4, 'U', 's', 'e', 'r', + 0, 5, 's', 'u', 'p', 'e', 'r', + 0, 4, 'U', 's', 'e', 'r', + 0, 5, 'a', 'd', 'm', 'i', 'n', // Renewers + 255, 255, 255, 255, 255, 255, 255, 255, // Max life time + } + + createDelegationTokenRequestV2 = []byte{ + 3, + 5, 'U', 's', 'e', 'r', + 6, 's', 'u', 'p', 'e', 'r', + 0, // Tag buffer + 5, 'U', 's', 'e', 'r', + 6, 'a', 'd', 'm', 'i', 'n', // Renewers + 0, // Tag buffer + 255, 255, 255, 255, 255, 255, 255, 255, // Max life time + 0, // Tag buffer + } + + createDelegationTokenRequestV3 = []byte{ + 0, // Owner principal type + 5, 't', 'e', 's', 't', // Owner principal name + 3, + 5, 'U', 's', 'e', 'r', + 6, 's', 'u', 'p', 'e', 'r', + 0, // Tag buffer + 5, 'U', 's', 'e', 'r', + 6, 'a', 'd', 'm', 'i', 'n', // Renewers + 0, // Tag buffer + 255, 255, 255, 255, 255, 255, 255, 255, // Max life time + 0, // Tag buffer + } +) + +func TestCreateDelegationTokenRequest(t *testing.T) { + user := AclResourceUser + + resp := &CreateDelegationTokenRequest{ + Renewers: []Principal{}, + MaxLifetime: -1 * time.Millisecond, + } + + testRequest(t, "version 0", resp, createDelegationTokenRequestV0) + + resp.Version = 1 + resp.Renewers = []Principal{{user.String(), "super"}, {user.String(), "admin"}} + + testRequest(t, "version 1", resp, createDelegationTokenRequestV1) + + resp.Version = 2 + testRequest(t, "version 2", resp, createDelegationTokenRequestV2) + + resp.Version = 3 + nm := "test" + resp.OwnerName = &nm + + testRequest(t, "version 3", resp, createDelegationTokenRequestV3) +} diff --git a/delegation_token_create_response.go b/delegation_token_create_response.go new file mode 100644 index 000000000..32b490b9d --- /dev/null +++ b/delegation_token_create_response.go @@ -0,0 +1,81 @@ +package sarama + +import ( + "time" +) + +type CreateDelegationTokenResponse struct { + DelegationToken + Version int16 + ErrorCode KError + ThrottleTime time.Duration +} + +func (c *CreateDelegationTokenResponse) encode(pe packetEncoder) error { + pe.putInt16(int16(c.ErrorCode)) + + if err := c.DelegationToken.encode(pe, c.Version); err != nil { + return err + } + + pe.putInt32(int32(c.ThrottleTime / time.Millisecond)) + + if c.Version > 1 { + pe.putEmptyTaggedFieldArray() + } + + return nil +} + +func (c *CreateDelegationTokenResponse) decode(pd packetDecoder, version int16) (err error) { + c.Version = version + + if errCode, err := pd.getInt16(); err == nil { + c.ErrorCode = KError(errCode) + } else { + return err + } + + if err := c.DelegationToken.decode(pd, version); err != nil { + return err + } + + var throttle int32 + if throttle, err = pd.getInt32(); err == nil { + c.ThrottleTime = time.Duration(throttle) * time.Millisecond + } + + if version > 1 && err == nil { + _, err = pd.getEmptyTaggedFieldArray() + } + + return err +} + +func (c *CreateDelegationTokenResponse) key() int16 { + return 38 +} + +func (c *CreateDelegationTokenResponse) version() int16 { + return c.Version +} + +func (c *CreateDelegationTokenResponse) headerVersion() int16 { + if c.Version > 1 { + return 1 + } + return 0 +} + +func (c *CreateDelegationTokenResponse) isValidVersion() bool { + return c.Version >= 0 && c.Version <= 3 +} + +func (r *CreateDelegationTokenResponse) requiredVersion() KafkaVersion { + switch r.Version { + case 3: + return V3_3_0_0 + default: + return V1_1_0_0 + } +} diff --git a/delegation_token_create_response_test.go b/delegation_token_create_response_test.go new file mode 100644 index 000000000..45fcaefc4 --- /dev/null +++ b/delegation_token_create_response_test.go @@ -0,0 +1,90 @@ +package sarama + +import ( + "testing" + "time" +) + +var ( + createDelegationTokenResponseV0 = []byte{ + 0, 0, + 0, 4, 'U', 's', 'e', 'r', + 0, 4, 't', 'e', 's', 't', + 0, 0, 0, 0, 0, 0, 0, 0, // Issue time + 0, 0, 0, 0, 0, 54, 238, 128, // Expiry time + 0, 0, 0, 0, 5, 38, 92, 0, // Max life time + 0, 10, 's', 'o', 'm', 'e', '-', 't', 'o', 'k', 'e', 'n', // Token ID + 0, 0, 0, 4, 1, 2, 3, 4, // HMAC + 0, 0, 0, 0, // Throttle time + } + + createDelegationTokenResponseV1 = []byte{ + 0, 0, + 0, 4, 'U', 's', 'e', 'r', + 0, 4, 't', 'e', 's', 't', + 0, 0, 0, 0, 0, 0, 0, 0, // Issue time + 0, 0, 0, 0, 0, 109, 221, 0, // Expiry time + 0, 0, 0, 0, 5, 38, 92, 0, // Max life time + 0, 10, 's', 'o', 'm', 'e', '-', 't', 'o', 'k', 'e', 'n', // Token ID + 0, 0, 0, 4, 1, 2, 3, 4, // HMAC + 0, 0, 0, 0, // Throttle time + } + + createDelegationTokenResponseV2 = []byte{ + 0, 0, + 5, 'U', 's', 'e', 'r', + 5, 't', 'e', 's', 't', + 0, 0, 0, 0, 0, 0, 0, 0, // Issue time + 0, 0, 0, 0, 0, 109, 221, 0, // Expiry time + 0, 0, 0, 0, 5, 38, 92, 0, // Max life time + 11, 's', 'o', 'm', 'e', '-', 't', 'o', 'k', 'e', 'n', // Token ID + 5, 1, 2, 3, 4, // HMAC + 0, 0, 0, 0, // Throttle time + 0, // Tag buffer + } + + createDelegationTokenResponseV3 = []byte{ + 0, 0, + 5, 'U', 's', 'e', 'r', + 5, 't', 'e', 's', 't', + 5, 'U', 's', 'e', 'r', + 10, 'r', 'e', 'q', 'u', 'e', 's', 't', 'e', 'r', + 0, 0, 0, 0, 0, 0, 0, 0, // Issue time + 0, 0, 0, 0, 0, 109, 221, 0, // Expiry time + 0, 0, 0, 0, 5, 38, 92, 0, // Max life time + 11, 's', 'o', 'm', 'e', '-', 't', 'o', 'k', 'e', 'n', // Token ID + 5, 1, 2, 3, 4, // HMAC + 0, 0, 0, 0, // Throttle time + 0, // Tag buffer + } +) + +func TestCreateDelegationTokenResponse(t *testing.T) { + user := AclResourceUser + resp := &CreateDelegationTokenResponse{ + DelegationToken: DelegationToken{ + Owner: Principal{user.String(), "test"}, + IssueTime: time.Unix(0, 0), + ExpiryTime: time.Unix(0, 0).Add(time.Hour), + MaxLifeTime: time.Unix(0, 0).Add(time.Hour * 24), + TokenID: "some-token", + HMAC: []byte{1, 2, 3, 4}, + }, + } + + testResponse(t, "version 0", resp, createDelegationTokenResponseV0) + + resp.Version = 1 + resp.ExpiryTime = resp.ExpiryTime.Add(time.Hour) + + testResponse(t, "version 1", resp, createDelegationTokenResponseV1) + + resp.Version = 2 + testResponse(t, "version 2", resp, createDelegationTokenResponseV2) + + resp.Version = 3 + resp.Requester = Principal{user.String(), "requester"} + + testResponse(t, "version 3", resp, createDelegationTokenResponseV3) + +} diff --git a/delegation_token_describe_request.go b/delegation_token_describe_request.go new file mode 100644 index 000000000..cfa8bb45f --- /dev/null +++ b/delegation_token_describe_request.go @@ -0,0 +1,84 @@ +package sarama + +type DescribeDelegationTokenRequest struct { + Version int16 + Owners []Principal +} + +func (d *DescribeDelegationTokenRequest) encode(pe packetEncoder) error { + + if d.Version > 1 { + pe.putCompactArrayLength(len(d.Owners)) + } else if err := pe.putArrayLength(len(d.Owners)); err != nil { + return err + } + + for _, p := range d.Owners { + if err := p.encode(pe, d.Version); err != nil { + return err + } + if d.Version > 1 { + pe.putEmptyTaggedFieldArray() + } + } + + if d.Version > 1 { + pe.putEmptyTaggedFieldArray() + } + return nil +} + +func (d *DescribeDelegationTokenRequest) decode(pd packetDecoder, version int16) (err error) { + d.Version = version + + var n int + if version > 1 { + n, err = pd.getCompactArrayLength() + } else { + n, err = pd.getArrayLength() + } + if err != nil { + return err + } + + d.Owners = make([]Principal, n) + for i := range d.Owners { + if err = d.Owners[i].decode(pd, version); err != nil { + return err + } + if version > 1 { + if _, err := pd.getEmptyTaggedFieldArray(); err != nil { + return err + } + } + } + + if d.Version > 1 { + _, err = pd.getEmptyTaggedFieldArray() + } + + return err +} + +func (d *DescribeDelegationTokenRequest) key() int16 { + return 41 +} + +func (d *DescribeDelegationTokenRequest) version() int16 { + return d.Version +} + +func (d *DescribeDelegationTokenRequest) headerVersion() int16 { + if d.Version > 1 { + return 2 + } + return 1 +} + +func (d *DescribeDelegationTokenRequest) isValidVersion() bool { + return d.Version >= 0 && d.Version <= 3 +} + +func (d *DescribeDelegationTokenRequest) requiredVersion() KafkaVersion { + return V1_1_0_0 +} diff --git a/delegation_token_describe_request_test.go b/delegation_token_describe_request_test.go new file mode 100644 index 000000000..5c42a69be --- /dev/null +++ b/delegation_token_describe_request_test.go @@ -0,0 +1,51 @@ +package sarama + +import ( + "testing" +) + +var ( + describeDelegationTokenRequestV0 = []byte{ + 0, 0, 0, 1, + 0, 4, 'U', 's', 'e', 'r', + 0, 3, 'f', 'o', 'o', + } + + describeDelegationTokenRequestV1 = []byte{ + 0, 0, 0, 2, + 0, 4, 'U', 's', 'e', 'r', + 0, 3, 'f', 'o', 'o', + 0, 4, 'U', 's', 'e', 'r', + 0, 3, 'b', 'a', 'r', + } + + describeDelegationTokenRequestV2 = []byte{ + 3, + 5, 'U', 's', 'e', 'r', + 4, 'f', 'o', 'o', + 0, + 5, 'U', 's', 'e', 'r', + 4, 'b', 'a', 'r', + 0, + 0, + } +) + +func TestDescribeDelegationTokenRequest(t *testing.T) { + user := AclResourceUser + + resp := &DescribeDelegationTokenRequest{Owners: []Principal{{user.String(), "foo"}}} + + testRequest(t, "version 0", resp, describeDelegationTokenRequestV0) + + resp.Version = 1 + resp.Owners = append(resp.Owners, Principal{user.String(), "bar"}) + + testRequest(t, "version 1", resp, describeDelegationTokenRequestV1) + + resp.Version = 2 + testRequest(t, "version 2", resp, describeDelegationTokenRequestV2) + + resp.Version = 3 + testRequest(t, "version 3", resp, describeDelegationTokenRequestV2) +} diff --git a/delegation_token_describe_response.go b/delegation_token_describe_response.go new file mode 100644 index 000000000..6b59f150c --- /dev/null +++ b/delegation_token_describe_response.go @@ -0,0 +1,144 @@ +package sarama + +import "time" + +type DescribeDelegationTokenResponse struct { + Version int16 + ErrorCode KError + Tokens []RenewableToken + ThrottleTime time.Duration +} + +type RenewableToken struct { + DelegationToken + Renewers []Principal +} + +func (d *DescribeDelegationTokenResponse) encode(pe packetEncoder) error { + pe.putInt16(int16(d.ErrorCode)) + + if d.Version > 1 { + pe.putCompactArrayLength(len(d.Tokens)) + } else if err := pe.putArrayLength(len(d.Tokens)); err != nil { + return err + } + + for _, t := range d.Tokens { + if err := t.encode(pe, d.Version); err != nil { + return err + } + + if d.Version > 1 { + pe.putCompactArrayLength(len(t.Renewers)) + } else if err := pe.putArrayLength(len(t.Renewers)); err != nil { + return err + } + for _, r := range t.Renewers { + if err := r.encode(pe, d.Version); err != nil { + return err + } + if d.Version > 1 { + pe.putEmptyTaggedFieldArray() + } + } + + if d.Version > 1 { + pe.putEmptyTaggedFieldArray() + } + } + + pe.putInt32(int32(d.ThrottleTime / time.Millisecond)) + + if d.Version > 1 { + pe.putEmptyTaggedFieldArray() + } + + return nil +} + +func (d *DescribeDelegationTokenResponse) decode(pd packetDecoder, version int16) (err error) { + d.Version = version + + if errCode, err := pd.getInt16(); err == nil { + d.ErrorCode = KError(errCode) + } else { + return err + } + + var n int + if version > 1 { + n, err = pd.getCompactArrayLength() + } else { + n, err = pd.getArrayLength() + } + if err != nil { + return err + } + + d.Tokens = make([]RenewableToken, n) + for i := range d.Tokens { + if err := d.Tokens[i].decode(pd, version); err != nil { + return err + } + + if version > 1 { + n, err = pd.getCompactArrayLength() + } else { + n, err = pd.getArrayLength() + } + if err != nil { + return err + } + + d.Tokens[i].Renewers = make([]Principal, n) + for j := range d.Tokens[i].Renewers { + if err = d.Tokens[i].Renewers[j].decode(pd, version); err != nil { + return err + } + if d.Version > 1 { + if _, err = pd.getEmptyTaggedFieldArray(); err != nil { + return err + } + } + } + if d.Version > 1 { + if _, err = pd.getEmptyTaggedFieldArray(); err != nil { + return err + } + } + } + + var throttle int32 + if throttle, err = pd.getInt32(); err == nil { + d.ThrottleTime = time.Duration(throttle) * time.Millisecond + } + + if d.Version > 1 && err == nil { + _, err = pd.getEmptyTaggedFieldArray() + } + + return err +} + +func (d *DescribeDelegationTokenResponse) key() int16 { + return 41 +} + +func (d *DescribeDelegationTokenResponse) version() int16 { + return d.Version +} + +func (d *DescribeDelegationTokenResponse) headerVersion() int16 { + if d.Version > 1 { + return 1 + } + return 0 +} + +func (d *DescribeDelegationTokenResponse) isValidVersion() bool { + return d.Version >= 0 && d.Version <= 3 +} + +func (d *DescribeDelegationTokenResponse) requiredVersion() KafkaVersion { + return V1_1_0_0 +} diff --git a/delegation_token_describe_response_test.go b/delegation_token_describe_response_test.go new file mode 100644 index 000000000..36725ce31 --- /dev/null +++ b/delegation_token_describe_response_test.go @@ -0,0 +1,158 @@ +package sarama + +import ( + "testing" + "time" +) + +var ( + describeDelegationTokenResponseV0 = []byte{ + 0, 0, + 0, 0, 0, 2, + 0, 4, 'U', 's', 'e', 'r', + 0, 4, 't', 'e', 's', 't', + 0, 0, 0, 0, 0, 0, 0, 0, // Issue time + 0, 0, 0, 0, 0, 54, 238, 128, // Expiry time + 0, 0, 0, 0, 5, 38, 92, 0, // Max life time + 0, 10, 's', 'o', 'm', 'e', '-', 't', 'o', 'k', 'e', 'n', // Token ID + 0, 0, 0, 4, 1, 2, 3, 4, // HMAC + 0, 0, 0, 0, // Renewers + 0, 4, 'U', 's', 'e', 'r', + 0, 5, 'o', 't', 'h', 'e', 'r', + 0, 0, 0, 0, 0, 0, 0, 0, // Issue time + 0, 0, 0, 0, 0, 54, 238, 128, // Expiry time + 0, 0, 0, 0, 5, 38, 92, 0, // Max life time + 0, 13, 'a', 'n', 'o', 't', 'h', 'e', 'r', '-', 't', 'o', 'k', 'e', 'n', // Token ID + 0, 0, 0, 4, 9, 8, 7, 6, // HMAC + 0, 0, 0, 0, //Renewers + 0, 0, 0, 0, // Throttle time + } + + describeDelegationTokenResponseV1 = []byte{ + 0, 0, + 0, 0, 0, 2, + 0, 4, 'U', 's', 'e', 'r', + 0, 4, 't', 'e', 's', 't', + 0, 0, 0, 0, 0, 0, 0, 0, // Issue time + 0, 0, 0, 0, 0, 54, 238, 128, // Expiry time + 0, 0, 0, 0, 5, 38, 92, 0, // Max life time + 0, 10, 's', 'o', 'm', 'e', '-', 't', 'o', 'k', 'e', 'n', // Token ID + 0, 0, 0, 4, 1, 2, 3, 4, // HMAC + 0, 0, 0, 1, + 0, 4, 'U', 's', 'e', 'r', + 0, 5, 's', 'u', 'p', 'e', 'r', // Renewers + 0, 4, 'U', 's', 'e', 'r', + 0, 5, 'o', 't', 'h', 'e', 'r', + 0, 0, 0, 0, 0, 0, 0, 0, // Issue time + 0, 0, 0, 0, 0, 54, 238, 128, // Expiry time + 0, 0, 0, 0, 5, 38, 92, 0, // Max life time + 0, 13, 'a', 'n', 'o', 't', 'h', 'e', 'r', '-', 't', 'o', 'k', 'e', 'n', // Token ID + 0, 0, 0, 4, 9, 8, 7, 6, // HMAC + 0, 0, 0, 0, // Renewers + 0, 0, 0, 0, // Throttle time + } + + describeDelegationTokenResponseV2 = []byte{ + 0, 0, + 3, + 5, 'U', 's', 'e', 'r', + 5, 't', 'e', 's', 't', + 0, 0, 0, 0, 0, 0, 0, 0, // Issue time + 0, 0, 0, 0, 0, 54, 238, 128, // Expiry time + 0, 0, 0, 0, 5, 38, 92, 0, // Max life time + 11, 's', 'o', 'm', 'e', '-', 't', 'o', 'k', 'e', 'n', // Token ID + 5, 1, 2, 3, 4, // HMAC + 2, + 5, 'U', 's', 'e', 'r', + 6, 's', 'u', 'p', 'e', 'r', // Renewers + 0, // Tag buffer + 0, // Tag buffer + 5, 'U', 's', 'e', 'r', + 6, 'o', 't', 'h', 'e', 'r', + 0, 0, 0, 0, 0, 0, 0, 0, // Issue time + 0, 0, 0, 0, 0, 54, 238, 128, // Expiry time + 0, 0, 0, 0, 5, 38, 92, 0, // Max life time + 14, 'a', 'n', 'o', 't', 'h', 'e', 'r', '-', 't', 'o', 'k', 'e', 'n', // Token ID + 5, 9, 8, 7, 6, // HMAC + 1, // Renewers + 0, // Tag buffer + 0, 0, 0, 0, // Throttle time + 0, // Tag buffer + } + + describeDelegationTokenResponseV3 = []byte{ + 0, 0, + 3, + 5, 'U', 's', 'e', 'r', + 5, 't', 'e', 's', 't', + 1, 1, + 0, 0, 0, 0, 0, 0, 0, 0, // Issue time + 0, 0, 0, 0, 0, 54, 238, 128, // Expiry time + 0, 0, 0, 0, 5, 38, 92, 0, // Max life time + 11, 's', 'o', 'm', 'e', '-', 't', 'o', 'k', 'e', 'n', // Token ID + 5, 1, 2, 3, 4, // HMAC + 2, + 5, 'U', 's', 'e', 'r', + 6, 's', 'u', 'p', 'e', 'r', // Renewers + 0, // Tag buffer + 0, // Tag buffer + 5, 'U', 's', 'e', 'r', + 6, 'o', 't', 'h', 'e', 'r', + 5, 'U', 's', 'e', 'r', + 10, 'r', 'e', 'q', 'u', 'e', 's', 't', 'e', 'r', + 0, 0, 0, 0, 0, 0, 0, 0, // Issue time + 0, 0, 0, 0, 0, 54, 238, 128, // Expiry time + 0, 0, 0, 0, 5, 38, 92, 0, // Max life time + 14, 'a', 'n', 'o', 't', 'h', 'e', 'r', '-', 't', 'o', 'k', 'e', 'n', // Token ID + 5, 9, 8, 7, 6, // HMAC + 1, // Renewers + 0, // Tag buffer + 0, 0, 0, 0, // Throttle time + 0, // Tag buffer + } +) + +func TestDescribeDelegationTokenResponse(t *testing.T) { + user := AclResourceUser + resp := &DescribeDelegationTokenResponse{ + Tokens: []RenewableToken{ + { + Renewers: []Principal{}, + DelegationToken: DelegationToken{ + Owner: Principal{user.String(), "test"}, + IssueTime: time.Unix(0, 0), + ExpiryTime: time.Unix(0, 0).Add(time.Hour), + MaxLifeTime: time.Unix(0, 0).Add(time.Hour * 24), + TokenID: "some-token", + HMAC: []byte{1, 2, 3, 4}, + }, + }, + { + Renewers: []Principal{}, + DelegationToken: DelegationToken{ + Owner: Principal{user.String(), "other"}, + IssueTime: time.Unix(0, 0), + ExpiryTime: time.Unix(0, 0).Add(time.Hour), + MaxLifeTime: time.Unix(0, 0).Add(time.Hour * 24), + TokenID: "another-token", + HMAC: []byte{9, 8, 7, 6}, + }, + }, + }, + } + + testResponse(t, "version 0", resp, describeDelegationTokenResponseV0) + + resp.Version = 1 + resp.Tokens[0].Renewers = []Principal{{user.String(), "super"}} + + testResponse(t, "version 1", resp, describeDelegationTokenResponseV1) + + resp.Version = 2 + testResponse(t, "version 2", resp, describeDelegationTokenResponseV2) + + resp.Version = 3 + resp.Tokens[1].Requester = Principal{user.String(), "requester"} + + testResponse(t, "version 3", resp, describeDelegationTokenResponseV3) +} diff --git a/delegation_token_expire_request.go b/delegation_token_expire_request.go new file mode 100644 index 000000000..b36fee4ad --- /dev/null +++ b/delegation_token_expire_request.go @@ -0,0 +1,76 @@ +package sarama + +import "time" + +type ExpireDelegationTokenRequest struct { + Version int16 + HMAC []byte + ExpiryPeriod time.Duration +} + +func (e *ExpireDelegationTokenRequest) encode(pe packetEncoder) error { + if e.Version > 1 { + if err := pe.putCompactBytes(e.HMAC); err != nil { + return err + } + } else { + if err := pe.putBytes(e.HMAC); err != nil { + return err + } + } + + pe.putInt64(e.ExpiryPeriod.Milliseconds()) + + if e.Version > 1 { + pe.putEmptyTaggedFieldArray() + } + + return nil +} + +func (e *ExpireDelegationTokenRequest) decode(pd packetDecoder, version int16) (err error) { + e.Version = version + + if version > 1 { + if e.HMAC, err = pd.getCompactBytes(); err != nil { + return err + } + } else { + if e.HMAC, err = pd.getBytes(); err != nil { + return err + } + } + + var ms int64 + if ms, err = pd.getInt64(); err == nil { + e.ExpiryPeriod = time.Duration(ms) * time.Millisecond + } + + if version > 1 && err == nil { + _, err = pd.getEmptyTaggedFieldArray() + } + return err +} + +func (e *ExpireDelegationTokenRequest) key() int16 { + return 40 +} + +func (e *ExpireDelegationTokenRequest) version() int16 { + return e.Version +} + +func (e *ExpireDelegationTokenRequest) headerVersion() int16 { + if e.Version > 1 { + return 2 + } + return 1 +} + +func (e *ExpireDelegationTokenRequest) isValidVersion() bool { + return e.Version >= 0 && e.Version <= 2 +} + +func (e *ExpireDelegationTokenRequest) requiredVersion() KafkaVersion { + return V1_1_0_0 +} diff --git a/delegation_token_expire_request_test.go b/delegation_token_expire_request_test.go new file mode 100644 index 000000000..4dfe9c412 --- /dev/null +++ b/delegation_token_expire_request_test.go @@ -0,0 +1,40 @@ +package sarama + +import ( + "testing" +) + +var ( + expireDelegationTokenRequestV0 = []byte{ + 0, 0, 0, 5, + 0, 2, 4, 6, 8, + 0, 0, 0, 0, 0, 0, 0, 0, + } + + expireDelegationTokenRequestV1 = []byte{ + 0, 0, 0, 8, + 0, 2, 4, 6, 8, 10, 12, 14, + 0, 0, 0, 0, 0, 0, 0, 0, + } + + expireDelegationTokenRequestV2 = []byte{ + 9, + 0, 2, 4, 6, 8, 10, 12, 14, + 0, 0, 0, 0, 0, 0, 0, 0, + 0, + } +) + +func TestExpireDelegationTokenRequest(t *testing.T) { + resp := &ExpireDelegationTokenRequest{HMAC: []byte{0, 2, 4, 6, 8}} + + testRequest(t, "version 0", resp, expireDelegationTokenRequestV0) + + resp.Version = 1 + resp.HMAC = append(resp.HMAC, 10, 12, 14) + + testRequest(t, "version 1", resp, expireDelegationTokenRequestV1) + + resp.Version = 2 + testRequest(t, "version 2", resp, expireDelegationTokenRequestV2) +} diff --git a/delegation_token_expire_response.go b/delegation_token_expire_response.go new file mode 100644 index 000000000..1d698a4fb --- /dev/null +++ b/delegation_token_expire_response.go @@ -0,0 +1,72 @@ +package sarama + +import "time" + +type ExpireDelegationTokenResponse struct { + Version int16 + ErrorCode KError + ExpiryTime time.Time + ThrottleTime time.Duration +} + +func (e *ExpireDelegationTokenResponse) encode(pe packetEncoder) error { + pe.putInt16(int16(e.ErrorCode)) + pe.putInt64(e.ExpiryTime.UnixMilli()) + pe.putInt32(int32(e.ThrottleTime / time.Millisecond)) + + if e.Version > 1 { + pe.putEmptyTaggedFieldArray() + } + + return nil +} + +func (e *ExpireDelegationTokenResponse) decode(pd packetDecoder, version int16) (err error) { + e.Version = version + + if errCode, err := pd.getInt16(); err == nil { + e.ErrorCode = KError(errCode) + } else { + return err + } + + if ms, err := pd.getInt64(); err == nil { + e.ExpiryTime = time.UnixMilli(ms) + } else { + return err + } + + var throttle int32 + if throttle, err = pd.getInt32(); err == nil { + e.ThrottleTime = time.Duration(throttle) * time.Millisecond + } + + if e.Version > 1 && err == nil { + _, err = pd.getEmptyTaggedFieldArray() + } + + return err +} + +func (e *ExpireDelegationTokenResponse) key() int16 { + return 40 +} + +func (e *ExpireDelegationTokenResponse) version() int16 { + return e.Version +} + +func (e *ExpireDelegationTokenResponse) headerVersion() int16 { + if e.Version > 1 { + return 1 + } + return 0 +} + +func (e *ExpireDelegationTokenResponse) isValidVersion() bool { + return e.Version >= 0 && e.Version <= 2 +} + +func (e *ExpireDelegationTokenResponse) requiredVersion() KafkaVersion { + return V1_1_0_0 +} diff --git a/delegation_token_expire_response_test.go b/delegation_token_expire_response_test.go new file mode 100644 index 000000000..6722fd2ef --- /dev/null +++ b/delegation_token_expire_response_test.go @@ -0,0 +1,42 @@ +package sarama + +import ( + "testing" + "time" +) + +var ( + expireDelegationTokenResponseV0 = []byte{ + 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, // Expiry time + 0, 0, 0, 0, // Throttle time + } + + expireDelegationTokenResponseV1 = []byte{ + 0, 0, + 0, 0, 0, 0, 0, 54, 238, 128, // Expiry time + 0, 0, 0, 0, // Throttle time + } + + expireDelegationTokenResponseV2 = []byte{ + 0, 0, + 0, 0, 0, 0, 0, 54, 238, 128, // Expiry time + 0, 0, 0, 0, // Throttle time + 0, // Tag buffer + } +) + +func TestExpireDelegationTokenResponse(t *testing.T) { + resp := &ExpireDelegationTokenResponse{ExpiryTime: time.Unix(0, 0)} + + testResponse(t, "version 0", resp, expireDelegationTokenResponseV0) + + resp.Version = 1 + resp.ExpiryTime = resp.ExpiryTime.Add(time.Hour) + + testResponse(t, "version 2", resp, expireDelegationTokenResponseV1) + + resp.Version = 2 + testResponse(t, "version 1", resp, expireDelegationTokenResponseV2) + +} diff --git a/delegation_token_renew_request.go b/delegation_token_renew_request.go new file mode 100644 index 000000000..6100dd05c --- /dev/null +++ b/delegation_token_renew_request.go @@ -0,0 +1,76 @@ +package sarama + +import "time" + +type RenewDelegationTokenRequest struct { + Version int16 + HMAC []byte + RenewalPeriod time.Duration +} + +func (r *RenewDelegationTokenRequest) encode(pe packetEncoder) error { + if r.Version > 1 { + if err := pe.putCompactBytes(r.HMAC); err != nil { + return err + } + } else { + if err := pe.putBytes(r.HMAC); err != nil { + return err + } + } + + pe.putInt64(r.RenewalPeriod.Milliseconds()) + + if r.Version > 1 { + pe.putEmptyTaggedFieldArray() + } + + return nil +} + +func (r *RenewDelegationTokenRequest) decode(pd packetDecoder, version int16) (err error) { + r.Version = version + + if version > 1 { + if r.HMAC, err = pd.getCompactBytes(); err != nil { + return err + } + } else { + if r.HMAC, err = pd.getBytes(); err != nil { + return err + } + } + + var ms int64 + if ms, err = pd.getInt64(); err == nil { + r.RenewalPeriod = time.Duration(ms) * time.Millisecond + } + + if version > 1 && err == nil { + _, err = pd.getEmptyTaggedFieldArray() + } + return err +} + +func (r *RenewDelegationTokenRequest) key() int16 { + return 39 +} + +func (r *RenewDelegationTokenRequest) version() int16 { + return r.Version +} + +func (r *RenewDelegationTokenRequest) headerVersion() int16 { + if r.Version > 1 { + return 2 + } + return 1 +} + +func (r *RenewDelegationTokenRequest) isValidVersion() bool { + return r.Version >= 0 && r.Version <= 2 +} + +func (r *RenewDelegationTokenRequest) requiredVersion() KafkaVersion { + return V1_1_0_0 +} diff --git a/delegation_token_renew_request_test.go b/delegation_token_renew_request_test.go new file mode 100644 index 000000000..4d03afd40 --- /dev/null +++ b/delegation_token_renew_request_test.go @@ -0,0 +1,40 @@ +package sarama + +import ( + "testing" +) + +var ( + renewDelegationTokenRequestV0 = []byte{ + 0, 0, 0, 5, + 0, 2, 4, 6, 8, + 0, 0, 0, 0, 0, 0, 0, 0, + } + + renewDelegationTokenRequestV1 = []byte{ + 0, 0, 0, 8, + 0, 2, 4, 6, 8, 10, 12, 14, + 0, 0, 0, 0, 0, 0, 0, 0, + } + + renewDelegationTokenRequestV2 = []byte{ + 9, + 0, 2, 4, 6, 8, 10, 12, 14, + 0, 0, 0, 0, 0, 0, 0, 0, + 0, + } +) + +func TestRenewDelegationTokenRequest(t *testing.T) { + resp := &RenewDelegationTokenRequest{HMAC: []byte{0, 2, 4, 6, 8}} + + testRequest(t, "version 0", resp, renewDelegationTokenRequestV0) + + resp.Version = 1 + resp.HMAC = append(resp.HMAC, 10, 12, 14) + + testRequest(t, "version 1", resp, renewDelegationTokenRequestV1) + + resp.Version = 2 + testRequest(t, "version 2", resp, renewDelegationTokenRequestV2) +} diff --git a/delegation_token_renew_response.go b/delegation_token_renew_response.go new file mode 100644 index 000000000..c9f25bd4e --- /dev/null +++ b/delegation_token_renew_response.go @@ -0,0 +1,72 @@ +package sarama + +import "time" + +type RenewDelegationTokenResponse struct { + Version int16 + ErrorCode KError + ExpiryTime time.Time + ThrottleTime time.Duration +} + +func (r *RenewDelegationTokenResponse) encode(pe packetEncoder) error { + pe.putInt16(int16(r.ErrorCode)) + pe.putInt64(r.ExpiryTime.UnixMilli()) + pe.putInt32(int32(r.ThrottleTime / time.Millisecond)) + + if r.Version > 1 { + pe.putEmptyTaggedFieldArray() + } + + return nil +} + +func (r *RenewDelegationTokenResponse) decode(pd packetDecoder, version int16) (err error) { + r.Version = version + + if errCode, err := pd.getInt16(); err == nil { + r.ErrorCode = KError(errCode) + } else { + return err + } + + if ms, err := pd.getInt64(); err == nil { + r.ExpiryTime = time.UnixMilli(ms) + } else { + return err + } + + var throttle int32 + if throttle, err = pd.getInt32(); err == nil { + r.ThrottleTime = time.Duration(throttle) * time.Millisecond + } + + if r.Version > 1 && err == nil { + _, err = pd.getEmptyTaggedFieldArray() + } + + return err +} + +func (r *RenewDelegationTokenResponse) key() int16 { + return 39 +} + +func (r *RenewDelegationTokenResponse) version() int16 { + return r.Version +} + +func (r *RenewDelegationTokenResponse) headerVersion() int16 { + if r.Version > 1 { + return 1 + } + return 0 +} + +func (r *RenewDelegationTokenResponse) isValidVersion() bool { + return r.Version >= 0 && r.Version <= 2 +} + +func (r *RenewDelegationTokenResponse) requiredVersion() KafkaVersion { + return V1_1_0_0 +} diff --git a/delegation_token_renew_response_test.go b/delegation_token_renew_response_test.go new file mode 100644 index 000000000..b5ac9b33b --- /dev/null +++ b/delegation_token_renew_response_test.go @@ -0,0 +1,42 @@ +package sarama + +import ( + "testing" + "time" +) + +var ( + renewDelegationTokenResponseV0 = []byte{ + 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, // Expiry time + 0, 0, 0, 0, // Throttle time + } + + renewDelegationTokenResponseV1 = []byte{ + 0, 0, + 0, 0, 0, 0, 0, 54, 238, 128, // Expiry time + 0, 0, 0, 0, // Throttle time + } + + renewDelegationTokenResponseV2 = []byte{ + 0, 0, + 0, 0, 0, 0, 0, 54, 238, 128, // Expiry time + 0, 0, 0, 0, // Throttle time + 0, // Tag buffer + } +) + +func TestRenewDelegationTokenResponse(t *testing.T) { + resp := &RenewDelegationTokenResponse{ExpiryTime: time.Unix(0, 0)} + + testResponse(t, "version 0", resp, renewDelegationTokenResponseV0) + + resp.Version = 1 + resp.ExpiryTime = resp.ExpiryTime.Add(time.Hour) + + testResponse(t, "version 2", resp, renewDelegationTokenResponseV1) + + resp.Version = 2 + testResponse(t, "version 1", resp, renewDelegationTokenResponseV2) + +} diff --git a/delegation_token_types.go b/delegation_token_types.go new file mode 100644 index 000000000..3ca5f6ffc --- /dev/null +++ b/delegation_token_types.go @@ -0,0 +1,114 @@ +package sarama + +import "time" + +type DelegationToken struct { + Owner Principal + Requester Principal + IssueTime time.Time + ExpiryTime time.Time + MaxLifeTime time.Time + TokenID string + HMAC []byte +} + +func (d *DelegationToken) encode(pe packetEncoder, version int16) error { + if err := d.Owner.encode(pe, version); err != nil { + return err + } + if version > 2 { + if err := d.Requester.encode(pe, version); err != nil { + return err + } + } + pe.putInt64(d.IssueTime.UnixMilli()) + pe.putInt64(d.ExpiryTime.UnixMilli()) + pe.putInt64(d.MaxLifeTime.UnixMilli()) + + if version < 2 { + if err := pe.putString(d.TokenID); err != nil { + return err + } + if err := pe.putBytes(d.HMAC); err != nil { + return err + } + } else { + if err := pe.putCompactString(d.TokenID); err != nil { + return err + } + if err := pe.putCompactBytes(d.HMAC); err != nil { + return err + } + } + return nil +} + +func (d *DelegationToken) decode(pd packetDecoder, version int16) (err error) { + if err := d.Owner.decode(pd, version); err != nil { + return err + } + if version > 2 { + if err := d.Requester.decode(pd, version); err != nil { + return err + } + } + + for _, f := range []*time.Time{&d.IssueTime, &d.ExpiryTime, &d.MaxLifeTime} { + if ms, err := pd.getInt64(); err == nil { + *f = time.UnixMilli(ms) + } else { + return err + } + } + + if version < 2 { + if d.TokenID, err = pd.getString(); err != nil { + return err + } + if d.HMAC, err = pd.getBytes(); err != nil { + return err + } + } else { + if d.TokenID, err = pd.getCompactString(); err != nil { + return err + } + if d.HMAC, err = pd.getCompactBytes(); err != nil { + return err + } + } + + return nil +} + +type Principal struct { + PrincipalType string + Name string +} + +func (k *Principal) encode(pe packetEncoder, version int16) (err error) { + f := func(s string) error { + if version < 2 { + return pe.putString(s) + } + return pe.putCompactString(s) + } + + if err = f(k.PrincipalType); err == nil { + err = f(k.Name) + } + return err +} + +func (k *Principal) decode(pd packetDecoder, version int16) (err error) { + f := func() (string, error) { + if version < 2 { + return pd.getString() + } + return pd.getCompactString() + } + + if k.PrincipalType, err = f(); err == nil { + k.Name, err = f() + } + return err +} diff --git a/request.go b/request.go index e8e74ca34..5a070378a 100644 --- a/request.go +++ b/request.go @@ -188,10 +188,14 @@ func allocateBody(key, version int16) protocolBody { return &SaslAuthenticateRequest{Version: version} case 37: return &CreatePartitionsRequest{Version: version} - // 38: CreateDelegationTokenRequest - // 39: RenewDelegationTokenRequest - // 40: ExpireDelegationTokenRequest - // 41: DescribeDelegationTokenRequest + case 38: + return &CreateDelegationTokenRequest{Version: version} + case 39: + return &RenewDelegationTokenRequest{Version: version} + case 40: + return &ExpireDelegationTokenRequest{Version: version} + case 41: + return &DescribeDelegationTokenRequest{Version: version} case 42: return &DeleteGroupsRequest{Version: version} // 43: ElectLeadersRequest diff --git a/request_test.go b/request_test.go index 70b09a89a..ae757db51 100644 --- a/request_test.go +++ b/request_test.go @@ -152,6 +152,14 @@ func allocateResponseBody(req protocolBody) protocolBody { return &SaslAuthenticateResponse{Version: version} case 37: return &CreatePartitionsResponse{Version: version} + case 38: + return &CreateDelegationTokenRequest{Version: version} + case 39: + return &RenewDelegationTokenRequest{Version: version} + case 40: + return &ExpireDelegationTokenRequest{Version: version} + case 41: + return &DescribeDelegationTokenRequest{Version: version} case 42: return &DeleteGroupsResponse{Version: version} case 44: