From f4ca0b48296538eb5639027cd83f40a3828caf7d Mon Sep 17 00:00:00 2001 From: Peter Dannemann <28637185+petedannemann@users.noreply.github.com> Date: Fri, 28 Jul 2023 12:54:10 -0400 Subject: [PATCH] Support describeacls (#1166) * Support describeacls * gofmt -s -w createacl_test.go * make test diff smaller and fix protocl api key * fix another protocol api key * improve test name * protocol fixes * add missing patterntype * fix createacls protocol * fix tags and add tagged fields back in * bump createacls version to v3 * wip * just one filter, not a list of filters * add missing patterntype in test * fix patterntype location * add prototests * createacl_test.go -> createacls_test.go * seperate createacls_test and describeacls_test * fix describeaclstest * add comment for ResourcePatternTypeFilter --- createacl_test.go => createacls_test.go | 11 +- describeacls.go | 107 +++++++++++++++ describeacls_test.go | 88 ++++++++++++ protocol/createacls/createacls.go | 36 +++-- protocol/createacls/createacls_test.go | 115 ++++++++++++++++ protocol/describeacls/describeacls.go | 72 ++++++++++ protocol/describeacls/describeacls_test.go | 149 +++++++++++++++++++++ 7 files changed, 560 insertions(+), 18 deletions(-) rename createacl_test.go => createacls_test.go (79%) create mode 100644 describeacls.go create mode 100644 describeacls_test.go create mode 100644 protocol/createacls/createacls_test.go create mode 100644 protocol/describeacls/describeacls.go create mode 100644 protocol/describeacls/describeacls_test.go diff --git a/createacl_test.go b/createacls_test.go similarity index 79% rename from createacl_test.go rename to createacls_test.go index 4f4b15380..ee04779ea 100644 --- a/createacl_test.go +++ b/createacls_test.go @@ -15,7 +15,10 @@ func TestClientCreateACLs(t *testing.T) { client, shutdown := newLocalClient() defer shutdown() - res, err := client.CreateACLs(context.Background(), &CreateACLsRequest{ + topic := makeTopic() + group := makeGroupID() + + createRes, err := client.CreateACLs(context.Background(), &CreateACLsRequest{ ACLs: []ACLEntry{ { Principal: "User:alice", @@ -23,7 +26,7 @@ func TestClientCreateACLs(t *testing.T) { Operation: ACLOperationTypeRead, ResourceType: ResourceTypeTopic, ResourcePatternType: PatternTypeLiteral, - ResourceName: "fake-topic-for-alice", + ResourceName: topic, Host: "*", }, { @@ -32,7 +35,7 @@ func TestClientCreateACLs(t *testing.T) { Operation: ACLOperationTypeRead, ResourceType: ResourceTypeGroup, ResourcePatternType: PatternTypeLiteral, - ResourceName: "fake-group-for-bob", + ResourceName: group, Host: "*", }, }, @@ -41,7 +44,7 @@ func TestClientCreateACLs(t *testing.T) { t.Fatal(err) } - for _, err := range res.Errors { + for _, err := range createRes.Errors { if err != nil { t.Error(err) } diff --git a/describeacls.go b/describeacls.go new file mode 100644 index 000000000..d1093bbed --- /dev/null +++ b/describeacls.go @@ -0,0 +1,107 @@ +package kafka + +import ( + "context" + "fmt" + "net" + "time" + + "github.com/segmentio/kafka-go/protocol/describeacls" +) + +// DescribeACLsRequest represents a request sent to a kafka broker to describe +// existing ACLs. +type DescribeACLsRequest struct { + // Address of the kafka broker to send the request to. + Addr net.Addr + + // Filter to filter ACLs on. + Filter ACLFilter +} + +type ACLFilter struct { + ResourceTypeFilter ResourceType + ResourceNameFilter string + // ResourcePatternTypeFilter was added in v1 and is not available prior to that. + ResourcePatternTypeFilter PatternType + PrincipalFilter string + HostFilter string + Operation ACLOperationType + PermissionType ACLPermissionType +} + +// DescribeACLsResponse represents a response from a kafka broker to an ACL +// describe request. +type DescribeACLsResponse struct { + // The amount of time that the broker throttled the request. + Throttle time.Duration + + // Error that occurred while attempting to describe + // the ACLs. + Error error + + // ACL resources returned from the describe request. + Resources []ACLResource +} + +type ACLResource struct { + ResourceType ResourceType + ResourceName string + PatternType PatternType + ACLs []ACLDescription +} + +type ACLDescription struct { + Principal string + Host string + Operation ACLOperationType + PermissionType ACLPermissionType +} + +func (c *Client) DescribeACLs(ctx context.Context, req *DescribeACLsRequest) (*DescribeACLsResponse, error) { + m, err := c.roundTrip(ctx, req.Addr, &describeacls.Request{ + Filter: describeacls.ACLFilter{ + ResourceTypeFilter: int8(req.Filter.ResourceTypeFilter), + ResourceNameFilter: req.Filter.ResourceNameFilter, + ResourcePatternTypeFilter: int8(req.Filter.ResourcePatternTypeFilter), + PrincipalFilter: req.Filter.PrincipalFilter, + HostFilter: req.Filter.HostFilter, + Operation: int8(req.Filter.Operation), + PermissionType: int8(req.Filter.PermissionType), + }, + }) + if err != nil { + return nil, fmt.Errorf("kafka.(*Client).DescribeACLs: %w", err) + } + + res := m.(*describeacls.Response) + resources := make([]ACLResource, len(res.Resources)) + + for resourceIdx, respResource := range res.Resources { + descriptions := make([]ACLDescription, len(respResource.ACLs)) + + for descriptionIdx, respDescription := range respResource.ACLs { + descriptions[descriptionIdx] = ACLDescription{ + Principal: respDescription.Principal, + Host: respDescription.Host, + Operation: ACLOperationType(respDescription.Operation), + PermissionType: ACLPermissionType(respDescription.PermissionType), + } + } + + resources[resourceIdx] = ACLResource{ + ResourceType: ResourceType(respResource.ResourceType), + ResourceName: respResource.ResourceName, + PatternType: PatternType(respResource.PatternType), + ACLs: descriptions, + } + } + + ret := &DescribeACLsResponse{ + Throttle: makeDuration(res.ThrottleTimeMs), + Error: makeError(res.ErrorCode, res.ErrorMessage), + Resources: resources, + } + + return ret, nil +} diff --git a/describeacls_test.go b/describeacls_test.go new file mode 100644 index 000000000..25585b25c --- /dev/null +++ b/describeacls_test.go @@ -0,0 +1,88 @@ +package kafka + +import ( + "context" + "testing" + + ktesting "github.com/segmentio/kafka-go/testing" + "github.com/stretchr/testify/assert" +) + +func TestClientDescribeACLs(t *testing.T) { + if !ktesting.KafkaIsAtLeast("2.0.1") { + return + } + + client, shutdown := newLocalClient() + defer shutdown() + + topic := makeTopic() + group := makeGroupID() + + createRes, err := client.CreateACLs(context.Background(), &CreateACLsRequest{ + ACLs: []ACLEntry{ + { + Principal: "User:alice", + PermissionType: ACLPermissionTypeAllow, + Operation: ACLOperationTypeRead, + ResourceType: ResourceTypeTopic, + ResourcePatternType: PatternTypeLiteral, + ResourceName: topic, + Host: "*", + }, + { + Principal: "User:bob", + PermissionType: ACLPermissionTypeAllow, + Operation: ACLOperationTypeRead, + ResourceType: ResourceTypeGroup, + ResourcePatternType: PatternTypeLiteral, + ResourceName: group, + Host: "*", + }, + }, + }) + if err != nil { + t.Fatal(err) + } + + for _, err := range createRes.Errors { + if err != nil { + t.Error(err) + } + } + + describeResp, err := client.DescribeACLs(context.Background(), &DescribeACLsRequest{ + Filter: ACLFilter{ + ResourceTypeFilter: ResourceTypeTopic, + ResourceNameFilter: topic, + ResourcePatternTypeFilter: PatternTypeLiteral, + Operation: ACLOperationTypeRead, + PermissionType: ACLPermissionTypeAllow, + }, + }) + if err != nil { + t.Fatal(err) + } + + expectedDescribeResp := DescribeACLsResponse{ + Throttle: 0, + Error: makeError(0, ""), + Resources: []ACLResource{ + { + ResourceType: ResourceTypeTopic, + ResourceName: topic, + PatternType: PatternTypeLiteral, + ACLs: []ACLDescription{ + { + Principal: "User:alice", + Host: "*", + Operation: ACLOperationTypeRead, + PermissionType: ACLPermissionTypeAllow, + }, + }, + }, + }, + } + + assert.Equal(t, expectedDescribeResp, *describeResp) +} diff --git a/protocol/createacls/createacls.go b/protocol/createacls/createacls.go index 893be44dd..aad0cc07c 100644 --- a/protocol/createacls/createacls.go +++ b/protocol/createacls/createacls.go @@ -9,9 +9,9 @@ func init() { type Request struct { // We need at least one tagged field to indicate that v2+ uses "flexible" // messages. - _ struct{} `kafka:"min=v2,max=v2,tag"` + _ struct{} `kafka:"min=v2,max=v3,tag"` - Creations []RequestACLs `kafka:"min=v0,max=v2"` + Creations []RequestACLs `kafka:"min=v0,max=v3"` } func (r *Request) ApiKey() protocol.ApiKey { return protocol.CreateAcls } @@ -21,29 +21,37 @@ func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) { } type RequestACLs struct { - ResourceType int8 `kafka:"min=v0,max=v2"` - ResourceName string `kafka:"min=v0,max=v2"` - ResourcePatternType int8 `kafka:"min=v0,max=v2"` - Principal string `kafka:"min=v0,max=v2"` - Host string `kafka:"min=v0,max=v2"` - Operation int8 `kafka:"min=v0,max=v2"` - PermissionType int8 `kafka:"min=v0,max=v2"` + // We need at least one tagged field to indicate that v2+ uses "flexible" + // messages. + _ struct{} `kafka:"min=v2,max=v3,tag"` + + ResourceType int8 `kafka:"min=v0,max=v3"` + ResourceName string `kafka:"min=v0,max=v1|min=v2,max=v3,compact"` + ResourcePatternType int8 `kafka:"min=v1,max=v3"` + Principal string `kafka:"min=v0,max=v1|min=v2,max=v3,compact"` + Host string `kafka:"min=v0,max=v1|min=v2,max=v3,compact"` + Operation int8 `kafka:"min=v0,max=v3"` + PermissionType int8 `kafka:"min=v0,max=v3"` } type Response struct { // We need at least one tagged field to indicate that v2+ uses "flexible" // messages. - _ struct{} `kafka:"min=v2,max=v2,tag"` + _ struct{} `kafka:"min=v2,max=v3,tag"` - ThrottleTimeMs int32 `kafka:"min=v0,max=v2"` - Results []ResponseACLs `kafka:"min=v0,max=v2"` + ThrottleTimeMs int32 `kafka:"min=v0,max=v3"` + Results []ResponseACLs `kafka:"min=v0,max=v3"` } func (r *Response) ApiKey() protocol.ApiKey { return protocol.CreateAcls } type ResponseACLs struct { - ErrorCode int16 `kafka:"min=v0,max=v2"` - ErrorMessage string `kafka:"min=v0,max=v2,nullable"` + // We need at least one tagged field to indicate that v2+ uses "flexible" + // messages. + _ struct{} `kafka:"min=v2,max=v3,tag"` + + ErrorCode int16 `kafka:"min=v0,max=v3"` + ErrorMessage string `kafka:"min=v0,max=v1,nullable|min=v2,max=v3,nullable,compact"` } var _ protocol.BrokerMessage = (*Request)(nil) diff --git a/protocol/createacls/createacls_test.go b/protocol/createacls/createacls_test.go new file mode 100644 index 000000000..61b48c805 --- /dev/null +++ b/protocol/createacls/createacls_test.go @@ -0,0 +1,115 @@ +package createacls_test + +import ( + "testing" + + "github.com/segmentio/kafka-go/protocol/createacls" + "github.com/segmentio/kafka-go/protocol/prototest" +) + +const ( + v0 = 0 + v1 = 1 + v2 = 2 + v3 = 3 +) + +func TestCreateACLsRequest(t *testing.T) { + prototest.TestRequest(t, v0, &createacls.Request{ + Creations: []createacls.RequestACLs{ + { + Principal: "User:alice", + PermissionType: 3, + Operation: 3, + ResourceType: 2, + ResourceName: "fake-topic-for-alice", + Host: "*", + }, + }, + }) + + prototest.TestRequest(t, v1, &createacls.Request{ + Creations: []createacls.RequestACLs{ + { + Principal: "User:alice", + PermissionType: 3, + Operation: 3, + ResourceType: 2, + ResourcePatternType: 3, + ResourceName: "fake-topic-for-alice", + Host: "*", + }, + }, + }) + + prototest.TestRequest(t, v2, &createacls.Request{ + Creations: []createacls.RequestACLs{ + { + Principal: "User:alice", + PermissionType: 3, + Operation: 3, + ResourceType: 2, + ResourcePatternType: 3, + ResourceName: "fake-topic-for-alice", + Host: "*", + }, + }, + }) + + prototest.TestRequest(t, v3, &createacls.Request{ + Creations: []createacls.RequestACLs{ + { + Principal: "User:alice", + PermissionType: 3, + Operation: 3, + ResourceType: 2, + ResourcePatternType: 3, + ResourceName: "fake-topic-for-alice", + Host: "*", + }, + }, + }) +} + +func TestCreateACLsResponse(t *testing.T) { + prototest.TestResponse(t, v0, &createacls.Response{ + ThrottleTimeMs: 1, + Results: []createacls.ResponseACLs{ + { + ErrorCode: 1, + ErrorMessage: "foo", + }, + }, + }) + + prototest.TestResponse(t, v1, &createacls.Response{ + ThrottleTimeMs: 1, + Results: []createacls.ResponseACLs{ + { + ErrorCode: 1, + ErrorMessage: "foo", + }, + }, + }) + + prototest.TestResponse(t, v2, &createacls.Response{ + ThrottleTimeMs: 1, + Results: []createacls.ResponseACLs{ + { + ErrorCode: 1, + ErrorMessage: "foo", + }, + }, + }) + + prototest.TestResponse(t, v3, &createacls.Response{ + ThrottleTimeMs: 1, + Results: []createacls.ResponseACLs{ + { + ErrorCode: 1, + ErrorMessage: "foo", + }, + }, + }) + +} diff --git a/protocol/describeacls/describeacls.go b/protocol/describeacls/describeacls.go new file mode 100644 index 000000000..93a7d2ed7 --- /dev/null +++ b/protocol/describeacls/describeacls.go @@ -0,0 +1,72 @@ +package describeacls + +import "github.com/segmentio/kafka-go/protocol" + +func init() { + protocol.Register(&Request{}, &Response{}) +} + +type Request struct { + // We need at least one tagged field to indicate that v2+ uses "flexible" + // messages. + _ struct{} `kafka:"min=v2,max=v3,tag"` + + Filter ACLFilter `kafka:"min=v0,max=v3"` +} + +func (r *Request) ApiKey() protocol.ApiKey { return protocol.DescribeAcls } + +func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) { + return cluster.Brokers[cluster.Controller], nil +} + +type ACLFilter struct { + // We need at least one tagged field to indicate that v2+ uses "flexible" + // messages. + _ struct{} `kafka:"min=v2,max=v3,tag"` + + ResourceTypeFilter int8 `kafka:"min=v0,max=v3"` + ResourceNameFilter string `kafka:"min=v0,max=v1,nullable|min=v2,max=v3,nullable,compact"` + ResourcePatternTypeFilter int8 `kafka:"min=v1,max=v3"` + PrincipalFilter string `kafka:"min=v0,max=v1,nullable|min=v2,max=v3,nullable,compact"` + HostFilter string `kafka:"min=v0,max=v1,nullable|min=v2,max=v3,nullable,compact"` + Operation int8 `kafka:"min=v0,max=v3"` + PermissionType int8 `kafka:"min=v0,max=v3"` +} + +type Response struct { + // We need at least one tagged field to indicate that v2+ uses "flexible" + // messages. + _ struct{} `kafka:"min=v2,max=v3,tag"` + + ThrottleTimeMs int32 `kafka:"min=v0,max=v3"` + ErrorCode int16 `kafka:"min=v0,max=v3"` + ErrorMessage string `kafka:"min=v0,max=v1,nullable|min=v2,max=v3,nullable,compact"` + Resources []Resource `kafka:"min=v0,max=v3"` +} + +func (r *Response) ApiKey() protocol.ApiKey { return protocol.DescribeAcls } + +type Resource struct { + // We need at least one tagged field to indicate that v2+ uses "flexible" + // messages. + _ struct{} `kafka:"min=v2,max=v3,tag"` + + ResourceType int8 `kafka:"min=v0,max=v3"` + ResourceName string `kafka:"min=v0,max=v1|min=v2,max=v3,compact"` + PatternType int8 `kafka:"min=v1,max=v3"` + ACLs []ResponseACL `kafka:"min=v0,max=v3"` +} + +type ResponseACL struct { + // We need at least one tagged field to indicate that v2+ uses "flexible" + // messages. + _ struct{} `kafka:"min=v2,max=v3,tag"` + + Principal string `kafka:"min=v0,max=v1|min=v2,max=v3,compact"` + Host string `kafka:"min=v0,max=v1|min=v2,max=v3,compact"` + Operation int8 `kafka:"min=v0,max=v3"` + PermissionType int8 `kafka:"min=v0,max=v3"` +} + +var _ protocol.BrokerMessage = (*Request)(nil) diff --git a/protocol/describeacls/describeacls_test.go b/protocol/describeacls/describeacls_test.go new file mode 100644 index 000000000..8fd45fffc --- /dev/null +++ b/protocol/describeacls/describeacls_test.go @@ -0,0 +1,149 @@ +package describeacls_test + +import ( + "testing" + + "github.com/segmentio/kafka-go/protocol/describeacls" + "github.com/segmentio/kafka-go/protocol/prototest" +) + +const ( + v0 = 0 + v1 = 1 + v2 = 2 + v3 = 3 +) + +func TestDescribeACLsRequest(t *testing.T) { + prototest.TestRequest(t, v0, &describeacls.Request{ + Filter: describeacls.ACLFilter{ + ResourceTypeFilter: 2, + ResourceNameFilter: "fake-topic-for-alice", + PrincipalFilter: "User:alice", + HostFilter: "*", + Operation: 3, + PermissionType: 3, + }, + }) + + prototest.TestRequest(t, v1, &describeacls.Request{ + Filter: describeacls.ACLFilter{ + ResourceTypeFilter: 2, + ResourceNameFilter: "fake-topic-for-alice", + ResourcePatternTypeFilter: 0, + PrincipalFilter: "User:alice", + HostFilter: "*", + Operation: 3, + PermissionType: 3, + }, + }) + + prototest.TestRequest(t, v2, &describeacls.Request{ + Filter: describeacls.ACLFilter{ + ResourceTypeFilter: 2, + ResourceNameFilter: "fake-topic-for-alice", + ResourcePatternTypeFilter: 0, + PrincipalFilter: "User:alice", + HostFilter: "*", + Operation: 3, + PermissionType: 3, + }, + }) + + prototest.TestRequest(t, v3, &describeacls.Request{ + Filter: describeacls.ACLFilter{ + ResourceTypeFilter: 2, + ResourceNameFilter: "fake-topic-for-alice", + ResourcePatternTypeFilter: 0, + PrincipalFilter: "User:alice", + HostFilter: "*", + Operation: 3, + PermissionType: 3, + }, + }) +} + +func TestDescribeACLsResponse(t *testing.T) { + prototest.TestResponse(t, v0, &describeacls.Response{ + ThrottleTimeMs: 1, + ErrorCode: 1, + ErrorMessage: "foo", + Resources: []describeacls.Resource{ + { + ResourceType: 2, + ResourceName: "fake-topic-for-alice", + ACLs: []describeacls.ResponseACL{ + { + Principal: "User:alice", + Host: "*", + Operation: 3, + PermissionType: 3, + }, + }, + }, + }, + }) + + prototest.TestResponse(t, v1, &describeacls.Response{ + ThrottleTimeMs: 1, + ErrorCode: 1, + ErrorMessage: "foo", + Resources: []describeacls.Resource{ + { + ResourceType: 2, + ResourceName: "fake-topic-for-alice", + PatternType: 3, + ACLs: []describeacls.ResponseACL{ + { + Principal: "User:alice", + Host: "*", + Operation: 3, + PermissionType: 3, + }, + }, + }, + }, + }) + + prototest.TestResponse(t, v2, &describeacls.Response{ + ThrottleTimeMs: 1, + ErrorCode: 1, + ErrorMessage: "foo", + Resources: []describeacls.Resource{ + { + ResourceType: 2, + ResourceName: "fake-topic-for-alice", + PatternType: 3, + ACLs: []describeacls.ResponseACL{ + { + Principal: "User:alice", + Host: "*", + Operation: 3, + PermissionType: 3, + }, + }, + }, + }, + }) + + prototest.TestResponse(t, v3, &describeacls.Response{ + ThrottleTimeMs: 1, + ErrorCode: 1, + ErrorMessage: "foo", + Resources: []describeacls.Resource{ + { + ResourceType: 2, + ResourceName: "fake-topic-for-alice", + PatternType: 3, + ACLs: []describeacls.ResponseACL{ + { + Principal: "User:alice", + Host: "*", + Operation: 3, + PermissionType: 3, + }, + }, + }, + }, + }) +}