From 80fea25df76a7a73b811db456cf053d7e005d0e5 Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Mon, 6 Jan 2025 21:26:17 +0000 Subject: [PATCH 1/3] fix: correct error message for COORDINATOR_NOT_AVAILABLE Signed-off-by: Dominic Evans --- errors.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/errors.go b/errors.go index 2c431aecb..842d30257 100644 --- a/errors.go +++ b/errors.go @@ -304,7 +304,7 @@ func (err KError) Error() string { case ErrOffsetsLoadInProgress: return "kafka server: The coordinator is still loading offsets and cannot currently process requests" case ErrConsumerCoordinatorNotAvailable: - return "kafka server: Offset's topic has not yet been created" + return "kafka server: The coordinator is not available" case ErrNotCoordinatorForConsumer: return "kafka server: Request was for a consumer group that is not coordinated by this broker" case ErrInvalidTopic: From 060bb3f8da230bb60f00c5de51bc21ce8d7bfa91 Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Sun, 5 Jan 2025 11:52:58 +0000 Subject: [PATCH 2/3] fix(admin): add retries for GroupCoordinator errors - retry admin operations that rely on talking to the group coordinator for a given ID if the coordinator has changed from the cached value, or is not available - also fixup controller retry in ElectLeaders to use the err from response rather than sendAndReceive - also rename isErrNotController to isRetriableControllerError for consistency Signed-off-by: Dominic Evans --- admin.go | 221 ++++++++++++++++++++++++++------------- functional_admin_test.go | 76 ++++++++++++++ 2 files changed, 226 insertions(+), 71 deletions(-) diff --git a/admin.go b/admin.go index 6549c7e6f..6f39b5b01 100644 --- a/admin.go +++ b/admin.go @@ -3,6 +3,7 @@ package sarama import ( "errors" "fmt" + "io" "math/rand" "strconv" "sync" @@ -144,6 +145,10 @@ type ClusterAdmin interface { // locally cached value if it's available. Controller() (*Broker, error) + // Coordinator returns the coordinating broker for a consumer group. It will + // return a locally cached value if it's available. + Coordinator(group string) (*Broker, error) + // Remove members from the consumer group by given member identities. // This operation is supported by brokers with version 2.3 or higher // This is for static membership feature. KIP-345 @@ -195,14 +200,25 @@ func (ca *clusterAdmin) Controller() (*Broker, error) { return ca.client.Controller() } +func (ca *clusterAdmin) Coordinator(group string) (*Broker, error) { + return ca.client.Coordinator(group) +} + func (ca *clusterAdmin) refreshController() (*Broker, error) { return ca.client.RefreshController() } -// isErrNotController returns `true` if the given error type unwraps to an -// `ErrNotController` response from Kafka -func isErrNotController(err error) bool { - return errors.Is(err, ErrNotController) +// isRetriableControllerError returns `true` if the given error type unwraps to +// an `ErrNotController` or `EOF` response from Kafka +func isRetriableControllerError(err error) bool { + return errors.Is(err, ErrNotController) || errors.Is(err, io.EOF) +} + +// isRetriableGroupCoordinatorError returns `true` if the given error type +// unwraps to an `ErrNotCoordinatorForConsumer`, +// `ErrConsumerCoordinatorNotAvailable` or `EOF` response from Kafka +func isRetriableGroupCoordinatorError(err error) bool { + return errors.Is(err, ErrNotCoordinatorForConsumer) || errors.Is(err, ErrConsumerCoordinatorNotAvailable) || errors.Is(err, io.EOF) } // retryOnError will repeatedly call the given (error-returning) func in the @@ -252,7 +268,7 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO request.Version = 1 } - return ca.retryOnError(isErrNotController, func() error { + return ca.retryOnError(isRetriableControllerError, func() error { b, err := ca.Controller() if err != nil { return err @@ -269,7 +285,7 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO } if !errors.Is(topicErr.Err, ErrNoError) { - if errors.Is(topicErr.Err, ErrNotController) { + if isRetriableControllerError(topicErr.Err) { _, _ = ca.refreshController() } return topicErr @@ -281,14 +297,14 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) { var response *MetadataResponse - err = ca.retryOnError(isErrNotController, func() error { + err = ca.retryOnError(isRetriableControllerError, func() error { controller, err := ca.Controller() if err != nil { return err } request := NewMetadataRequest(ca.conf.Version, topics) response, err = controller.GetMetadata(request) - if isErrNotController(err) { + if isRetriableControllerError(err) { _, _ = ca.refreshController() } return err @@ -301,7 +317,7 @@ func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetada func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32, err error) { var response *MetadataResponse - err = ca.retryOnError(isErrNotController, func() error { + err = ca.retryOnError(isRetriableControllerError, func() error { controller, err := ca.Controller() if err != nil { return err @@ -309,7 +325,7 @@ func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32 request := NewMetadataRequest(ca.conf.Version, nil) response, err = controller.GetMetadata(request) - if isErrNotController(err) { + if isRetriableControllerError(err) { _, _ = ca.refreshController() } return err @@ -441,7 +457,7 @@ func (ca *clusterAdmin) DeleteTopic(topic string) error { request.Version = 1 } - return ca.retryOnError(isErrNotController, func() error { + return ca.retryOnError(isRetriableControllerError, func() error { b, err := ca.Controller() if err != nil { return err @@ -485,7 +501,7 @@ func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [ request.Version = 1 } - return ca.retryOnError(isErrNotController, func() error { + return ca.retryOnError(isRetriableControllerError, func() error { b, err := ca.Controller() if err != nil { return err @@ -526,7 +542,7 @@ func (ca *clusterAdmin) AlterPartitionReassignments(topic string, assignment [][ request.AddBlock(topic, int32(i), assignment[i]) } - return ca.retryOnError(isErrNotController, func() error { + return ca.retryOnError(isRetriableControllerError, func() error { b, err := ca.Controller() if err != nil { return err @@ -573,7 +589,7 @@ func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []in request.AddBlock(topic, partitions) var rsp *ListPartitionReassignmentsResponse - err = ca.retryOnError(isErrNotController, func() error { + err = ca.retryOnError(isRetriableControllerError, func() error { b, err := ca.Controller() if err != nil { return err @@ -581,7 +597,7 @@ func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []in _ = b.Open(ca.client.Config()) rsp, err = b.ListPartitionReassignments(request) - if isErrNotController(err) { + if isRetriableControllerError(err) { _, _ = ca.refreshController() } return err @@ -924,7 +940,7 @@ func (ca *clusterAdmin) ElectLeaders(electionType ElectionType, partitions map[s } var res *ElectLeadersResponse - err := ca.retryOnError(isErrNotController, func() error { + if err := ca.retryOnError(isRetriableControllerError, func() error { b, err := ca.Controller() if err != nil { return err @@ -932,12 +948,17 @@ func (ca *clusterAdmin) ElectLeaders(electionType ElectionType, partitions map[s _ = b.Open(ca.client.Config()) res, err = b.ElectLeaders(request) - if isErrNotController(err) { - _, _ = ca.refreshController() + if err != nil { + return err } - return err - }) - if err != nil { + if !errors.Is(res.ErrorCode, ErrNoError) { + if isRetriableControllerError(res.ErrorCode) { + _, _ = ca.refreshController() + } + return res.ErrorCode + } + return nil + }); err != nil { return nil, err } return res.ReplicaElectionResults, nil @@ -947,11 +968,11 @@ func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*Group groupsPerBroker := make(map[*Broker][]string) for _, group := range groups { - controller, err := ca.client.Coordinator(group) + coordinator, err := ca.client.Coordinator(group) if err != nil { return nil, err } - groupsPerBroker[controller] = append(groupsPerBroker[controller], group) + groupsPerBroker[coordinator] = append(groupsPerBroker[coordinator], group) } for broker, brokerGroups := range groupsPerBroker { @@ -1043,22 +1064,37 @@ func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err e } func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error) { - coordinator, err := ca.client.Coordinator(group) - if err != nil { - return nil, err - } - + var response *OffsetFetchResponse request := NewOffsetFetchRequest(ca.conf.Version, group, topicPartitions) + err := ca.retryOnError(isRetriableGroupCoordinatorError, func() error { + coordinator, err := ca.client.Coordinator(group) + if err != nil { + return err + } - return coordinator.FetchOffset(request) + defer func() { + if err != nil && isRetriableGroupCoordinatorError(err) { + _ = ca.client.RefreshCoordinator(group) + } + }() + + response, err = coordinator.FetchOffset(request) + if err != nil { + return err + } + if !errors.Is(response.Err, ErrNoError) { + err = response.Err + return err + } + + return nil + }) + + return response, err } func (ca *clusterAdmin) DeleteConsumerGroupOffset(group string, topic string, partition int32) error { - coordinator, err := ca.client.Coordinator(group) - if err != nil { - return err - } - + var response *DeleteOffsetsResponse request := &DeleteOffsetsRequest{ Group: group, partitions: map[string][]int32{ @@ -1066,27 +1102,37 @@ func (ca *clusterAdmin) DeleteConsumerGroupOffset(group string, topic string, pa }, } - resp, err := coordinator.DeleteOffsets(request) - if err != nil { - return err - } + return ca.retryOnError(isRetriableGroupCoordinatorError, func() error { + coordinator, err := ca.client.Coordinator(group) + if err != nil { + return err + } - if !errors.Is(resp.ErrorCode, ErrNoError) { - return resp.ErrorCode - } + defer func() { + if err != nil && isRetriableGroupCoordinatorError(err) { + _ = ca.client.RefreshCoordinator(group) + } + }() - if !errors.Is(resp.Errors[topic][partition], ErrNoError) { - return resp.Errors[topic][partition] - } - return nil + response, err = coordinator.DeleteOffsets(request) + if err != nil { + return err + } + if !errors.Is(response.ErrorCode, ErrNoError) { + err = response.ErrorCode + return err + } + if !errors.Is(response.Errors[topic][partition], ErrNoError) { + err = response.Errors[topic][partition] + return err + } + + return nil + }) } func (ca *clusterAdmin) DeleteConsumerGroup(group string) error { - coordinator, err := ca.client.Coordinator(group) - if err != nil { - return err - } - + var response *DeleteGroupsResponse request := &DeleteGroupsRequest{ Groups: []string{group}, } @@ -1094,21 +1140,35 @@ func (ca *clusterAdmin) DeleteConsumerGroup(group string) error { request.Version = 1 } - resp, err := coordinator.DeleteGroups(request) - if err != nil { - return err - } + return ca.retryOnError(isRetriableGroupCoordinatorError, func() error { + coordinator, err := ca.client.Coordinator(group) + if err != nil { + return err + } - groupErr, ok := resp.GroupErrorCodes[group] - if !ok { - return ErrIncompleteResponse - } + defer func() { + if err != nil && isRetriableGroupCoordinatorError(err) { + _ = ca.client.RefreshCoordinator(group) + } + }() - if !errors.Is(groupErr, ErrNoError) { - return groupErr - } + response, err = coordinator.DeleteGroups(request) + if err != nil { + return err + } - return nil + groupErr, ok := response.GroupErrorCodes[group] + if !ok { + return ErrIncompleteResponse + } + + if !errors.Is(groupErr, ErrNoError) { + err = groupErr + return err + } + + return nil + }) } func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32][]DescribeLogDirsResponseDirMetadata, err error) { @@ -1206,7 +1266,7 @@ func (ca *clusterAdmin) AlterUserScramCredentials(u []AlterUserScramCredentialsU } var rsp *AlterUserScramCredentialsResponse - err := ca.retryOnError(isErrNotController, func() error { + err := ca.retryOnError(isRetriableControllerError, func() error { b, err := ca.Controller() if err != nil { return err @@ -1284,18 +1344,14 @@ func (ca *clusterAdmin) AlterClientQuotas(entity []QuotaEntityComponent, op Clie return nil } -func (ca *clusterAdmin) RemoveMemberFromConsumerGroup(groupId string, groupInstanceIds []string) (*LeaveGroupResponse, error) { +func (ca *clusterAdmin) RemoveMemberFromConsumerGroup(group string, groupInstanceIds []string) (*LeaveGroupResponse, error) { if !ca.conf.Version.IsAtLeast(V2_4_0_0) { return nil, ConfigurationError("Removing members from a consumer group headers requires Kafka version of at least v2.4.0") } - - controller, err := ca.client.Coordinator(groupId) - if err != nil { - return nil, err - } + var response *LeaveGroupResponse request := &LeaveGroupRequest{ Version: 3, - GroupId: groupId, + GroupId: group, } for _, instanceId := range groupInstanceIds { groupInstanceId := instanceId @@ -1303,5 +1359,28 @@ func (ca *clusterAdmin) RemoveMemberFromConsumerGroup(groupId string, groupInsta GroupInstanceId: &groupInstanceId, }) } - return controller.LeaveGroup(request) + err := ca.retryOnError(isRetriableGroupCoordinatorError, func() error { + coordinator, err := ca.client.Coordinator(group) + if err != nil { + return err + } + + defer func() { + if err != nil && isRetriableGroupCoordinatorError(err) { + _ = ca.client.RefreshCoordinator(group) + } + }() + + response, err = coordinator.LeaveGroup(request) + if err != nil { + return err + } + if !errors.Is(response.Err, ErrNoError) { + err = response.Err + return err + } + + return nil + }) + return response, err } diff --git a/functional_admin_test.go b/functional_admin_test.go index 9962bce95..7985a4e44 100644 --- a/functional_admin_test.go +++ b/functional_admin_test.go @@ -3,7 +3,10 @@ package sarama import ( + "context" "testing" + + "github.com/davecgh/go-spew/spew" ) func TestFuncAdminQuotas(t *testing.T) { @@ -180,3 +183,76 @@ func TestFuncAdminDescribeGroups(t *testing.T) { m1.AssertCleanShutdown() m2.AssertCleanShutdown() } + +func TestFuncAdminListConsumerGroupOffsets(t *testing.T) { + checkKafkaVersion(t, "0.8.2.0") + setupFunctionalTest(t) + defer teardownFunctionalTest(t) + + config := NewFunctionalTestConfig() + config.ClientID = t.Name() + client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config) + defer safeClose(t, client) + if err != nil { + t.Fatal(err) + } + + group := testFuncConsumerGroupID(t) + consumerGroup, err := NewConsumerGroupFromClient(group, client) + if err != nil { + t.Fatal(err) + } + defer safeClose(t, consumerGroup) + + offsetMgr, _ := NewOffsetManagerFromClient(group, client) + defer safeClose(t, offsetMgr) + markOffset(t, offsetMgr, "test.4", 0, 2) + offsetMgr.Commit() + + coordinator, err := client.Coordinator(group) + if err != nil { + t.Fatal(err) + } + + t.Logf("coordinator broker %d", coordinator.id) + + adminClient, err := NewClusterAdminFromClient(client) + if err != nil { + t.Fatal(err) + } + { + resp, err := adminClient.ListConsumerGroupOffsets(group, map[string][]int32{"test.4": {0, 1, 2, 3}}) + if err != nil { + t.Fatal(err) + } + t.Log(spew.Sdump(resp)) + } + + brokerID := coordinator.id + if err := stopDockerTestBroker(context.Background(), brokerID); err != nil { + t.Fatal(err) + } + + t.Cleanup( + func() { + if err := startDockerTestBroker(context.Background(), brokerID); err != nil { + t.Fatal(err) + } + }, + ) + + { + resp, err := adminClient.ListConsumerGroupOffsets(group, map[string][]int32{"test.4": {0, 1, 2, 3}}) + if err != nil { + t.Fatal(err) + } + t.Log(spew.Sdump(resp)) + } + + coordinator, err = adminClient.Coordinator(group) + if err != nil { + t.Fatal(err) + } + + t.Logf("coordinator broker %d", coordinator.id) +} From ecbf73af3ad330008903d024fa5bf40958bb9793 Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Tue, 7 Jan 2025 09:48:59 +0000 Subject: [PATCH 3/3] fix(admin): use named return on retryOnError func Use a named return for the defer func to inspect to avoid subtle bugs Signed-off-by: Dominic Evans --- admin.go | 64 ++++++++++++++++++++++++++------------------------------ 1 file changed, 30 insertions(+), 34 deletions(-) diff --git a/admin.go b/admin.go index 6f39b5b01..8aa1f374e 100644 --- a/admin.go +++ b/admin.go @@ -1066,25 +1066,24 @@ func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err e func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error) { var response *OffsetFetchResponse request := NewOffsetFetchRequest(ca.conf.Version, group, topicPartitions) - err := ca.retryOnError(isRetriableGroupCoordinatorError, func() error { - coordinator, err := ca.client.Coordinator(group) - if err != nil { - return err - } - + err := ca.retryOnError(isRetriableGroupCoordinatorError, func() (err error) { defer func() { if err != nil && isRetriableGroupCoordinatorError(err) { _ = ca.client.RefreshCoordinator(group) } }() + coordinator, err := ca.client.Coordinator(group) + if err != nil { + return err + } + response, err = coordinator.FetchOffset(request) if err != nil { return err } if !errors.Is(response.Err, ErrNoError) { - err = response.Err - return err + return response.Err } return nil @@ -1102,29 +1101,27 @@ func (ca *clusterAdmin) DeleteConsumerGroupOffset(group string, topic string, pa }, } - return ca.retryOnError(isRetriableGroupCoordinatorError, func() error { - coordinator, err := ca.client.Coordinator(group) - if err != nil { - return err - } - + return ca.retryOnError(isRetriableGroupCoordinatorError, func() (err error) { defer func() { if err != nil && isRetriableGroupCoordinatorError(err) { _ = ca.client.RefreshCoordinator(group) } }() + coordinator, err := ca.client.Coordinator(group) + if err != nil { + return err + } + response, err = coordinator.DeleteOffsets(request) if err != nil { return err } if !errors.Is(response.ErrorCode, ErrNoError) { - err = response.ErrorCode - return err + return response.ErrorCode } if !errors.Is(response.Errors[topic][partition], ErrNoError) { - err = response.Errors[topic][partition] - return err + return response.Errors[topic][partition] } return nil @@ -1140,18 +1137,18 @@ func (ca *clusterAdmin) DeleteConsumerGroup(group string) error { request.Version = 1 } - return ca.retryOnError(isRetriableGroupCoordinatorError, func() error { - coordinator, err := ca.client.Coordinator(group) - if err != nil { - return err - } - + return ca.retryOnError(isRetriableGroupCoordinatorError, func() (err error) { defer func() { if err != nil && isRetriableGroupCoordinatorError(err) { _ = ca.client.RefreshCoordinator(group) } }() + coordinator, err := ca.client.Coordinator(group) + if err != nil { + return err + } + response, err = coordinator.DeleteGroups(request) if err != nil { return err @@ -1163,8 +1160,7 @@ func (ca *clusterAdmin) DeleteConsumerGroup(group string) error { } if !errors.Is(groupErr, ErrNoError) { - err = groupErr - return err + return groupErr } return nil @@ -1359,28 +1355,28 @@ func (ca *clusterAdmin) RemoveMemberFromConsumerGroup(group string, groupInstanc GroupInstanceId: &groupInstanceId, }) } - err := ca.retryOnError(isRetriableGroupCoordinatorError, func() error { - coordinator, err := ca.client.Coordinator(group) - if err != nil { - return err - } - + err := ca.retryOnError(isRetriableGroupCoordinatorError, func() (err error) { defer func() { if err != nil && isRetriableGroupCoordinatorError(err) { _ = ca.client.RefreshCoordinator(group) } }() + coordinator, err := ca.client.Coordinator(group) + if err != nil { + return err + } + response, err = coordinator.LeaveGroup(request) if err != nil { return err } if !errors.Is(response.Err, ErrNoError) { - err = response.Err - return err + return response.Err } return nil }) + return response, err }