From 3c731eeb50c847b4297d7640e76a3e4b02f20d7a Mon Sep 17 00:00:00 2001 From: arxon31 Date: Sun, 19 Jan 2025 19:45:52 +0300 Subject: [PATCH 1/6] fix: running partition watcher only for leader --- consumergroup.go | 48 +++++++++++++++++++++++------------------------- 1 file changed, 23 insertions(+), 25 deletions(-) diff --git a/consumergroup.go b/consumergroup.go index f4bb382c..5c2d07d6 100644 --- a/consumergroup.go +++ b/consumergroup.go @@ -789,13 +789,9 @@ func (cg *ConsumerGroup) nextGeneration(memberID string) (string, error) { } defer conn.Close() - var generationID int32 - var groupAssignments GroupMemberAssignments - var assignments map[string][]int32 - // join group. this will join the group and prepare assignments if our // consumer is elected leader. it may also change or assign the member ID. - memberID, generationID, groupAssignments, err = cg.joinGroup(conn, memberID) + memberID, generationID, groupAssignments, iAmLeader, err := cg.joinGroup(conn, memberID) if err != nil { cg.withErrorLogger(func(log Logger) { log.Printf("Failed to join group %s: %v", cg.config.ID, err) @@ -803,11 +799,11 @@ func (cg *ConsumerGroup) nextGeneration(memberID string) (string, error) { return memberID, err } cg.withLogger(func(log Logger) { - log.Printf("Joined group %s as member %s in generation %d", cg.config.ID, memberID, generationID) + log.Printf("Joined group %s as member %s (leader %t) in generation %d", cg.config.ID, memberID, iAmLeader, generationID) }) // sync group - assignments, err = cg.syncGroup(conn, memberID, generationID, groupAssignments) + assignments, err := cg.syncGroup(conn, memberID, generationID, groupAssignments) if err != nil { cg.withErrorLogger(func(log Logger) { log.Printf("Failed to sync group %s: %v", cg.config.ID, err) @@ -843,7 +839,7 @@ func (cg *ConsumerGroup) nextGeneration(memberID string) (string, error) { // any of these functions exit, then the generation is determined to be // complete. gen.heartbeatLoop(cg.config.HeartbeatInterval) - if cg.config.WatchPartitionChanges { + if cg.config.WatchPartitionChanges && iAmLeader { for _, topic := range cg.config.Topics { gen.partitionWatcher(cg.config.PartitionWatchInterval, topic) } @@ -925,16 +921,16 @@ func (cg *ConsumerGroup) coordinator() (coordinator, error) { // the leader. Otherwise, GroupMemberAssignments will be nil. // // Possible kafka error codes returned: -// * GroupLoadInProgress: -// * GroupCoordinatorNotAvailable: -// * NotCoordinatorForGroup: -// * InconsistentGroupProtocol: -// * InvalidSessionTimeout: -// * GroupAuthorizationFailed: -func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, error) { +// - GroupLoadInProgress: +// - GroupCoordinatorNotAvailable: +// - NotCoordinatorForGroup: +// - InconsistentGroupProtocol: +// - InvalidSessionTimeout: +// - GroupAuthorizationFailed: +func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, bool, error) { request, err := cg.makeJoinGroupRequestV1(memberID) if err != nil { - return "", 0, nil, err + return "", 0, nil, false, err } response, err := conn.joinGroup(request) @@ -942,7 +938,7 @@ func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, i err = Error(response.ErrorCode) } if err != nil { - return "", 0, nil, err + return "", 0, nil, false, err } memberID = response.MemberID @@ -953,10 +949,12 @@ func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, i }) var assignments GroupMemberAssignments - if iAmLeader := response.MemberID == response.LeaderID; iAmLeader { + iAmLeader := response.MemberID == response.LeaderID + + if iAmLeader { v, err := cg.assignTopicPartitions(conn, response) if err != nil { - return memberID, 0, nil, err + return memberID, 0, nil, false, err } assignments = v @@ -973,7 +971,7 @@ func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, i l.Printf("joinGroup succeeded for response, %v. generationID=%v, memberID=%v", cg.config.ID, response.GenerationID, response.MemberID) }) - return memberID, generationID, assignments, nil + return memberID, generationID, assignments, iAmLeader, nil } // makeJoinGroupRequestV1 handles the logic of constructing a joinGroup @@ -1073,11 +1071,11 @@ func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMember // Readers subscriptions topic => partitions // // Possible kafka error codes returned: -// * GroupCoordinatorNotAvailable: -// * NotCoordinatorForGroup: -// * IllegalGeneration: -// * RebalanceInProgress: -// * GroupAuthorizationFailed: +// - GroupCoordinatorNotAvailable: +// - NotCoordinatorForGroup: +// - IllegalGeneration: +// - RebalanceInProgress: +// - GroupAuthorizationFailed: func (cg *ConsumerGroup) syncGroup(conn coordinator, memberID string, generationID int32, memberAssignments GroupMemberAssignments) (map[string][]int32, error) { request := cg.makeSyncGroupRequestV0(memberID, generationID, memberAssignments) response, err := conn.syncGroup(request) From fa0cc73cf199ba8dda1a22ec63351deaa80cd413 Mon Sep 17 00:00:00 2001 From: arxon31 Date: Sun, 19 Jan 2025 20:13:24 +0300 Subject: [PATCH 2/6] fix: setting partitions number in assignTopicPartitions --- consumergroup.go | 47 ++++++++++++++++++++++++++----------------- consumergroup_test.go | 2 +- 2 files changed, 29 insertions(+), 20 deletions(-) diff --git a/consumergroup.go b/consumergroup.go index 5c2d07d6..64c6ae95 100644 --- a/consumergroup.go +++ b/consumergroup.go @@ -497,7 +497,7 @@ func (g *Generation) heartbeatLoop(interval time.Duration) { // a bad spot and should rebalance. Commonly you will see an error here if there // is a problem with the connection to the coordinator and a rebalance will // establish a new connection to the coordinator. -func (g *Generation) partitionWatcher(interval time.Duration, topic string) { +func (g *Generation) partitionWatcher(interval time.Duration, topic string, startPartitions int) { g.Start(func(ctx context.Context) { g.log(func(l Logger) { l.Printf("started partition watcher for group, %v, topic %v [%v]", g.GroupID, topic, interval) @@ -509,14 +509,6 @@ func (g *Generation) partitionWatcher(interval time.Duration, topic string) { ticker := time.NewTicker(interval) defer ticker.Stop() - ops, err := g.conn.readPartitions(topic) - if err != nil { - g.logError(func(l Logger) { - l.Printf("Problem getting partitions during startup, %v\n, Returning and setting up nextGeneration", err) - }) - return - } - oParts := len(ops) for { select { case <-ctx.Done(): @@ -525,7 +517,7 @@ func (g *Generation) partitionWatcher(interval time.Duration, topic string) { ops, err := g.conn.readPartitions(topic) switch { case err == nil, errors.Is(err, UnknownTopicOrPartition): - if len(ops) != oParts { + if len(ops) != startPartitions { g.log(func(l Logger) { l.Printf("Partition changes found, rebalancing group: %v.", g.GroupID) }) @@ -651,11 +643,17 @@ func NewConsumerGroup(config ConsumerGroupConfig) (*ConsumerGroup, error) { } cg := &ConsumerGroup{ - config: config, - next: make(chan *Generation), - errs: make(chan error), - done: make(chan struct{}), + config: config, + partitionsPerTopic: make(map[string]int, len(config.Topics)), + next: make(chan *Generation), + errs: make(chan error), + done: make(chan struct{}), + } + + for _, topic := range config.Topics { + cg.partitionsPerTopic[topic] = 0 } + cg.wg.Add(1) go func() { cg.run() @@ -670,9 +668,10 @@ func NewConsumerGroup(config ConsumerGroupConfig) (*ConsumerGroup, error) { // Generation is where partition assignments and offset management occur. // Callers will use Next to get a handle to the Generation. type ConsumerGroup struct { - config ConsumerGroupConfig - next chan *Generation - errs chan error + config ConsumerGroupConfig + partitionsPerTopic map[string]int + next chan *Generation + errs chan error closeOnce sync.Once wg sync.WaitGroup @@ -840,8 +839,8 @@ func (cg *ConsumerGroup) nextGeneration(memberID string) (string, error) { // complete. gen.heartbeatLoop(cg.config.HeartbeatInterval) if cg.config.WatchPartitionChanges && iAmLeader { - for _, topic := range cg.config.Topics { - gen.partitionWatcher(cg.config.PartitionWatchInterval, topic) + for topic, startPartitions := range cg.partitionsPerTopic { + gen.partitionWatcher(cg.config.PartitionWatchInterval, topic, startPartitions) } } @@ -1034,6 +1033,16 @@ func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroup return nil, err } + // resetting old values of partitions per topic + for topic := range cg.partitionsPerTopic { + cg.partitionsPerTopic[topic] = 0 + } + + // setting new values of partitions per topic + for _, partition := range partitions { + cg.partitionsPerTopic[partition.Topic] += 1 + } + cg.withLogger(func(l Logger) { l.Printf("using '%v' balancer to assign group, %v", group.GroupProtocol, cg.config.ID) for _, member := range members { diff --git a/consumergroup_test.go b/consumergroup_test.go index 0d3e290a..884e6546 100644 --- a/consumergroup_test.go +++ b/consumergroup_test.go @@ -637,7 +637,7 @@ func TestGenerationExitsOnPartitionChange(t *testing.T) { done := make(chan struct{}) go func() { - gen.partitionWatcher(watchTime, "topic-1") + gen.partitionWatcher(watchTime, "topic-1", 1) close(done) }() From 76b3e0cdf6adfdb231e617ab13e0fa23a6035fe0 Mon Sep 17 00:00:00 2001 From: arxon31 Date: Sun, 19 Jan 2025 20:40:47 +0300 Subject: [PATCH 3/6] fixed resetting olda values of partitions per topic --- consumergroup.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/consumergroup.go b/consumergroup.go index 64c6ae95..a309bd6b 100644 --- a/consumergroup.go +++ b/consumergroup.go @@ -1034,9 +1034,7 @@ func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroup } // resetting old values of partitions per topic - for topic := range cg.partitionsPerTopic { - cg.partitionsPerTopic[topic] = 0 - } + cg.partitionsPerTopic = make(map[string]int, len(cg.config.Topics)) // setting new values of partitions per topic for _, partition := range partitions { From 3c5b262330101751452d7f5845765d2cfede57c2 Mon Sep 17 00:00:00 2001 From: arxon31 Date: Sun, 2 Feb 2025 12:54:11 +0300 Subject: [PATCH 4/6] cr fix: removed unnecessary map filling --- consumergroup.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/consumergroup.go b/consumergroup.go index a309bd6b..d01e092a 100644 --- a/consumergroup.go +++ b/consumergroup.go @@ -650,10 +650,6 @@ func NewConsumerGroup(config ConsumerGroupConfig) (*ConsumerGroup, error) { done: make(chan struct{}), } - for _, topic := range config.Topics { - cg.partitionsPerTopic[topic] = 0 - } - cg.wg.Add(1) go func() { cg.run() @@ -1034,7 +1030,7 @@ func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroup } // resetting old values of partitions per topic - cg.partitionsPerTopic = make(map[string]int, len(cg.config.Topics)) + cg.partitionsPerTopic = make(map[string]int, len(topics)) // setting new values of partitions per topic for _, partition := range partitions { From 914a268c774a625020754830f2f4f23b47d7a2f6 Mon Sep 17 00:00:00 2001 From: arxon31 Date: Sun, 2 Feb 2025 12:59:09 +0300 Subject: [PATCH 5/6] cr fix: removed running partitionWatcher only by leader --- consumergroup.go | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/consumergroup.go b/consumergroup.go index d01e092a..c8504fd5 100644 --- a/consumergroup.go +++ b/consumergroup.go @@ -786,7 +786,7 @@ func (cg *ConsumerGroup) nextGeneration(memberID string) (string, error) { // join group. this will join the group and prepare assignments if our // consumer is elected leader. it may also change or assign the member ID. - memberID, generationID, groupAssignments, iAmLeader, err := cg.joinGroup(conn, memberID) + memberID, generationID, groupAssignments, err := cg.joinGroup(conn, memberID) if err != nil { cg.withErrorLogger(func(log Logger) { log.Printf("Failed to join group %s: %v", cg.config.ID, err) @@ -794,7 +794,7 @@ func (cg *ConsumerGroup) nextGeneration(memberID string) (string, error) { return memberID, err } cg.withLogger(func(log Logger) { - log.Printf("Joined group %s as member %s (leader %t) in generation %d", cg.config.ID, memberID, iAmLeader, generationID) + log.Printf("Joined group %s as member %s in generation %d", cg.config.ID, memberID, generationID) }) // sync group @@ -834,7 +834,7 @@ func (cg *ConsumerGroup) nextGeneration(memberID string) (string, error) { // any of these functions exit, then the generation is determined to be // complete. gen.heartbeatLoop(cg.config.HeartbeatInterval) - if cg.config.WatchPartitionChanges && iAmLeader { + if cg.config.WatchPartitionChanges { for topic, startPartitions := range cg.partitionsPerTopic { gen.partitionWatcher(cg.config.PartitionWatchInterval, topic, startPartitions) } @@ -922,10 +922,10 @@ func (cg *ConsumerGroup) coordinator() (coordinator, error) { // - InconsistentGroupProtocol: // - InvalidSessionTimeout: // - GroupAuthorizationFailed: -func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, bool, error) { +func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, error) { request, err := cg.makeJoinGroupRequestV1(memberID) if err != nil { - return "", 0, nil, false, err + return "", 0, nil, err } response, err := conn.joinGroup(request) @@ -933,7 +933,7 @@ func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, i err = Error(response.ErrorCode) } if err != nil { - return "", 0, nil, false, err + return "", 0, nil, err } memberID = response.MemberID @@ -944,12 +944,11 @@ func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, i }) var assignments GroupMemberAssignments - iAmLeader := response.MemberID == response.LeaderID - if iAmLeader { + if iAmLeader := response.MemberID == response.LeaderID; iAmLeader { v, err := cg.assignTopicPartitions(conn, response) if err != nil { - return memberID, 0, nil, false, err + return memberID, 0, nil, err } assignments = v @@ -966,7 +965,7 @@ func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, i l.Printf("joinGroup succeeded for response, %v. generationID=%v, memberID=%v", cg.config.ID, response.GenerationID, response.MemberID) }) - return memberID, generationID, assignments, iAmLeader, nil + return memberID, generationID, assignments, nil } // makeJoinGroupRequestV1 handles the logic of constructing a joinGroup From fbd6927592974b7ee7f82a211d121cdccdc17a4b Mon Sep 17 00:00:00 2001 From: arxon31 Date: Sun, 2 Feb 2025 13:20:37 +0300 Subject: [PATCH 6/6] cr fix: fixed autoformatting --- consumergroup.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/consumergroup.go b/consumergroup.go index c8504fd5..705cc0d0 100644 --- a/consumergroup.go +++ b/consumergroup.go @@ -916,12 +916,12 @@ func (cg *ConsumerGroup) coordinator() (coordinator, error) { // the leader. Otherwise, GroupMemberAssignments will be nil. // // Possible kafka error codes returned: -// - GroupLoadInProgress: -// - GroupCoordinatorNotAvailable: -// - NotCoordinatorForGroup: -// - InconsistentGroupProtocol: -// - InvalidSessionTimeout: -// - GroupAuthorizationFailed: +// * GroupLoadInProgress: +// * GroupCoordinatorNotAvailable: +// * NotCoordinatorForGroup: +// * InconsistentGroupProtocol: +// * InvalidSessionTimeout: +// * GroupAuthorizationFailed: func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, error) { request, err := cg.makeJoinGroupRequestV1(memberID) if err != nil { @@ -1073,11 +1073,11 @@ func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMember // Readers subscriptions topic => partitions // // Possible kafka error codes returned: -// - GroupCoordinatorNotAvailable: -// - NotCoordinatorForGroup: -// - IllegalGeneration: -// - RebalanceInProgress: -// - GroupAuthorizationFailed: +// * GroupCoordinatorNotAvailable: +// * NotCoordinatorForGroup: +// * IllegalGeneration: +// * RebalanceInProgress: +// * GroupAuthorizationFailed: func (cg *ConsumerGroup) syncGroup(conn coordinator, memberID string, generationID int32, memberAssignments GroupMemberAssignments) (map[string][]int32, error) { request := cg.makeSyncGroupRequestV0(memberID, generationID, memberAssignments) response, err := conn.syncGroup(request)