Skip to content

Commit ba119e5

Browse files
committed
optimize tagging controller workqueue handling
1 parent bf18a51 commit ba119e5

File tree

3 files changed

+139
-64
lines changed

3 files changed

+139
-64
lines changed

pkg/controllers/tagging/metrics.go

+2-15
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,15 @@ limitations under the License.
1414
package tagging
1515

1616
import (
17+
"sync"
18+
1719
"k8s.io/component-base/metrics"
1820
"k8s.io/component-base/metrics/legacyregistry"
19-
"sync"
2021
)
2122

2223
var register sync.Once
2324

2425
var (
25-
workItemDuration = metrics.NewHistogramVec(
26-
&metrics.HistogramOpts{
27-
Name: "cloudprovider_aws_tagging_controller_work_item_duration_seconds",
28-
Help: "workitem latency of workitem being in the queue and time it takes to process",
29-
StabilityLevel: metrics.ALPHA,
30-
Buckets: metrics.ExponentialBuckets(0.5, 1.5, 20),
31-
},
32-
[]string{"latency_type"})
33-
3426
workItemError = metrics.NewCounterVec(
3527
&metrics.CounterOpts{
3628
Name: "cloudprovider_aws_tagging_controller_work_item_errors_total",
@@ -43,15 +35,10 @@ var (
4335
// registerMetrics registers tagging-controller metrics.
4436
func registerMetrics() {
4537
register.Do(func() {
46-
legacyregistry.MustRegister(workItemDuration)
4738
legacyregistry.MustRegister(workItemError)
4839
})
4940
}
5041

51-
func recordWorkItemLatencyMetrics(latencyType string, timeTaken float64) {
52-
workItemDuration.With(metrics.Labels{"latency_type": latencyType}).Observe(timeTaken)
53-
}
54-
5542
func recordWorkItemErrorMetrics(errorType string, instanceID string) {
5643
workItemError.With(metrics.Labels{"error_type": errorType, "instance_id": instanceID}).Inc()
5744
}

pkg/controllers/tagging/tagging_controller.go

+57-43
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222

2323
"golang.org/x/time/rate"
2424
v1 "k8s.io/api/core/v1"
25+
apierrors "k8s.io/apimachinery/pkg/api/errors"
2526
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
2627
"k8s.io/apimachinery/pkg/util/wait"
2728
coreinformers "k8s.io/client-go/informers/core/v1"
@@ -42,16 +43,21 @@ func init() {
4243
registerMetrics()
4344
}
4445

45-
// workItem contains the node and an action for that node
46+
// taggingControllerNode contains the node details required for tag/untag of node resources.
47+
type taggingControllerNode struct {
48+
providerID string
49+
name string
50+
}
51+
52+
// workItem contains the node name, provider id and an action for that node.
4653
type workItem struct {
47-
node *v1.Node
48-
action func(node *v1.Node) error
49-
requeuingCount int
50-
enqueueTime time.Time
54+
name string
55+
providerID string
56+
action string
5157
}
5258

5359
func (w workItem) String() string {
54-
return fmt.Sprintf("[Node: %s, RequeuingCount: %d, EnqueueTime: %s]", w.node.GetName(), w.requeuingCount, w.enqueueTime)
60+
return fmt.Sprintf("[Node: %s, Action: %s]", w.name, w.action)
5561
}
5662

5763
const (
@@ -62,17 +68,15 @@ const (
6268
// The label for depicting total number of errors a work item encounter and succeed
6369
totalErrorsWorkItemErrorMetric = "total_errors"
6470

65-
// The label for depicting total time when work item gets queued to processed
66-
workItemProcessingTimeWorkItemMetric = "work_item_processing_time"
67-
68-
// The label for depicting total time when work item gets queued to dequeued
69-
workItemDequeuingTimeWorkItemMetric = "work_item_dequeuing_time"
70-
7171
// The label for depicting total number of errors a work item encounter and fail
7272
errorsAfterRetriesExhaustedWorkItemErrorMetric = "errors_after_retries_exhausted"
7373

7474
// The period of time after Node creation to retry tagging due to eventual consistency of the CreateTags API.
7575
newNodeEventualConsistencyGracePeriod = time.Minute * 5
76+
77+
addTag = "ADD"
78+
79+
deleteTag = "DELETE"
7680
)
7781

7882
// Controller is the controller implementation for tagging cluster resources.
@@ -152,7 +156,7 @@ func NewTaggingController(
152156
tc.nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
153157
AddFunc: func(obj interface{}) {
154158
node := obj.(*v1.Node)
155-
tc.enqueueNode(node, tc.tagNodesResources)
159+
tc.enqueueNode(node, addTag)
156160
},
157161
UpdateFunc: func(oldObj, newObj interface{}) {
158162
node := newObj.(*v1.Node)
@@ -165,11 +169,11 @@ func NewTaggingController(
165169
return
166170
}
167171

168-
tc.enqueueNode(node, tc.tagNodesResources)
172+
tc.enqueueNode(node, addTag)
169173
},
170174
DeleteFunc: func(obj interface{}) {
171175
node := obj.(*v1.Node)
172-
tc.enqueueNode(node, tc.untagNodeResources)
176+
tc.enqueueNode(node, deleteTag)
173177
},
174178
})
175179

@@ -215,21 +219,17 @@ func (tc *Controller) process() bool {
215219
err := func(obj interface{}) error {
216220
defer tc.workqueue.Done(obj)
217221

218-
workItem, ok := obj.(*workItem)
222+
workItem, ok := obj.(workItem)
219223
if !ok {
220224
tc.workqueue.Forget(obj)
221225
err := fmt.Errorf("expected workItem in workqueue but got %s", obj)
222226
utilruntime.HandleError(err)
223227
return nil
224228
}
225229

226-
timeTaken := time.Since(workItem.enqueueTime).Seconds()
227-
recordWorkItemLatencyMetrics(workItemDequeuingTimeWorkItemMetric, timeTaken)
228-
klog.Infof("Dequeuing latency %f seconds", timeTaken)
229-
230-
instanceID, err := awsv1.KubernetesInstanceID(workItem.node.Spec.ProviderID).MapToAWSInstanceID()
230+
instanceID, err := awsv1.KubernetesInstanceID(workItem.providerID).MapToAWSInstanceID()
231231
if err != nil {
232-
err = fmt.Errorf("Error in getting instanceID for node %s, error: %v", workItem.node.GetName(), err)
232+
err = fmt.Errorf("error in getting instanceID for node %s, error: %v", workItem.name, err)
233233
utilruntime.HandleError(err)
234234
return nil
235235
}
@@ -241,26 +241,31 @@ func (tc *Controller) process() bool {
241241
tc.workqueue.Forget(obj)
242242
return nil
243243
}
244-
245-
err = workItem.action(workItem.node)
246-
244+
if workItem.action == addTag {
245+
err = tc.tagNodesResources(&taggingControllerNode{
246+
name: workItem.name,
247+
providerID: workItem.providerID,
248+
})
249+
} else {
250+
err = tc.untagNodeResources(&taggingControllerNode{
251+
name: workItem.name,
252+
providerID: workItem.providerID,
253+
})
254+
}
247255
if err != nil {
248-
if workItem.requeuingCount < maxRequeuingCount {
256+
numRetries := tc.workqueue.NumRequeues(workItem)
257+
if numRetries < maxRequeuingCount {
249258
// Put the item back on the workqueue to handle any transient errors.
250-
workItem.requeuingCount++
251259
tc.workqueue.AddRateLimited(workItem)
252260

253261
recordWorkItemErrorMetrics(totalErrorsWorkItemErrorMetric, string(instanceID))
254-
return fmt.Errorf("error processing work item '%v': %s, requeuing count %d", workItem, err.Error(), workItem.requeuingCount)
262+
return fmt.Errorf("error processing work item '%v': %s, requeuing count %d", workItem, err.Error(), numRetries)
255263
}
256264

257265
klog.Errorf("error processing work item %s: %s, requeuing count exceeded", workItem, err.Error())
258266
recordWorkItemErrorMetrics(errorsAfterRetriesExhaustedWorkItemErrorMetric, string(instanceID))
259267
} else {
260268
klog.Infof("Finished processing %s", workItem)
261-
timeTaken = time.Since(workItem.enqueueTime).Seconds()
262-
recordWorkItemLatencyMetrics(workItemProcessingTimeWorkItemMetric, timeTaken)
263-
klog.Infof("Processing latency %f seconds", timeTaken)
264269
}
265270

266271
tc.workqueue.Forget(obj)
@@ -277,11 +282,19 @@ func (tc *Controller) process() bool {
277282

278283
// tagNodesResources tag node resources
279284
// If we want to tag more resources, modify this function appropriately
280-
func (tc *Controller) tagNodesResources(node *v1.Node) error {
285+
func (tc *Controller) tagNodesResources(node *taggingControllerNode) error {
281286
for _, resource := range tc.resources {
282287
switch resource {
283288
case opt.Instance:
284-
err := tc.tagEc2Instance(node)
289+
v1node, err := tc.nodeInformer.Lister().Get(node.name)
290+
if err != nil {
291+
// If node not found, just ignore it as its okay to not add tags when the node object is deleted.
292+
if apierrors.IsNotFound(err) {
293+
return nil
294+
}
295+
return err
296+
}
297+
err = tc.tagEc2Instance(v1node)
285298
if err != nil {
286299
return err
287300
}
@@ -334,7 +347,7 @@ func (tc *Controller) tagEc2Instance(node *v1.Node) error {
334347

335348
// untagNodeResources untag node resources
336349
// If we want to untag more resources, modify this function appropriately
337-
func (tc *Controller) untagNodeResources(node *v1.Node) error {
350+
func (tc *Controller) untagNodeResources(node *taggingControllerNode) error {
338351
for _, resource := range tc.resources {
339352
switch resource {
340353
case opt.Instance:
@@ -350,13 +363,13 @@ func (tc *Controller) untagNodeResources(node *v1.Node) error {
350363

351364
// untagEc2Instances deletes the provided tags to each EC2 instances in
352365
// the cluster.
353-
func (tc *Controller) untagEc2Instance(node *v1.Node) error {
354-
instanceID, _ := awsv1.KubernetesInstanceID(node.Spec.ProviderID).MapToAWSInstanceID()
366+
func (tc *Controller) untagEc2Instance(node *taggingControllerNode) error {
367+
instanceID, _ := awsv1.KubernetesInstanceID(node.providerID).MapToAWSInstanceID()
355368

356369
err := tc.cloud.UntagResource(string(instanceID), tc.tags)
357370

358371
if err != nil {
359-
klog.Errorf("Error in untagging EC2 instance %s for node %s, error: %v", instanceID, node.GetName(), err)
372+
klog.Errorf("Error in untagging EC2 instance %s for node %s, error: %v", instanceID, node.name, err)
360373
return err
361374
}
362375

@@ -367,12 +380,13 @@ func (tc *Controller) untagEc2Instance(node *v1.Node) error {
367380

368381
// enqueueNode takes in the object and an
369382
// action for the object for a workitem and enqueue to the workqueue
370-
func (tc *Controller) enqueueNode(node *v1.Node, action func(node *v1.Node) error) {
371-
item := &workItem{
372-
node: node,
373-
action: action,
374-
requeuingCount: 0,
375-
enqueueTime: time.Now(),
383+
func (tc *Controller) enqueueNode(node *v1.Node, action string) {
384+
// if the struct has fields which are all comparable then the workqueue add will handle make sure multiple adds of the same object
385+
// will only have one item in the workqueue.
386+
item := workItem{
387+
name: node.GetName(),
388+
providerID: node.Spec.ProviderID,
389+
action: action,
376390
}
377391

378392
if tc.rateLimitEnabled {

pkg/controllers/tagging/tagging_controller_test.go

+80-6
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"testing"
2323
"time"
2424

25+
"golang.org/x/time/rate"
2526
v1 "k8s.io/api/core/v1"
2627
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2728
"k8s.io/client-go/informers"
@@ -221,27 +222,32 @@ func Test_NodesJoiningAndLeaving(t *testing.T) {
221222
nodeMonitorPeriod: 1 * time.Second,
222223
tags: map[string]string{"key2": "value2", "key1": "value1"},
223224
resources: []string{"instance"},
224-
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Tagging"),
225-
rateLimitEnabled: testcase.rateLimited,
225+
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewTypedMaxOfRateLimiter(
226+
workqueue.NewTypedItemExponentialFailureRateLimiter[any](1*time.Millisecond, 5*time.Millisecond),
227+
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
228+
&workqueue.TypedBucketRateLimiter[any]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
229+
), "Tagging"),
230+
rateLimitEnabled: testcase.rateLimited,
226231
}
227232

228233
if testcase.toBeTagged {
229-
tc.enqueueNode(testcase.currNode, tc.tagNodesResources)
234+
tc.enqueueNode(testcase.currNode, addTag)
230235
} else {
231-
tc.enqueueNode(testcase.currNode, tc.untagNodeResources)
236+
tc.enqueueNode(testcase.currNode, deleteTag)
232237
}
233238

234239
if tc.rateLimitEnabled {
235240
// If rate limit is enabled, sleep for 10 ms to wait for the item to be added to the queue since the base delay is 5 ms.
236241
time.Sleep(10 * time.Millisecond)
237242
}
238243

244+
cnt := 0
239245
for tc.workqueue.Len() > 0 {
240246
tc.process()
241-
247+
cnt++
242248
// sleep briefly because of exponential backoff when requeueing failed workitem
243249
// resulting in workqueue to be empty if checked immediately
244-
time.Sleep(1500 * time.Millisecond)
250+
time.Sleep(7 * time.Millisecond)
245251
}
246252

247253
for _, msg := range testcase.expectedMessages {
@@ -256,12 +262,80 @@ func Test_NodesJoiningAndLeaving(t *testing.T) {
256262
if !strings.Contains(logBuf.String(), "requeuing count exceeded") {
257263
t.Errorf("\nExceeded requeue count but did not stop: \n%v\n", logBuf.String())
258264
}
265+
if cnt != maxRequeuingCount+1 {
266+
t.Errorf("the node got requeued %d, more than the max requeuing count of %d", cnt, maxRequeuingCount)
267+
}
259268
}
260269
}
261270
})
262271
}
263272
}
264273

274+
func TestMultipleEnqueues(t *testing.T) {
275+
awsServices := awsv1.NewFakeAWSServices(TestClusterID)
276+
fakeAws, _ := awsv1.NewAWSCloud(config.CloudConfig{}, awsServices)
277+
278+
testNode := &v1.Node{
279+
ObjectMeta: metav1.ObjectMeta{
280+
Name: "node0",
281+
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
282+
},
283+
Spec: v1.NodeSpec{
284+
ProviderID: "i-0001",
285+
},
286+
}
287+
testNode1 := &v1.Node{
288+
ObjectMeta: metav1.ObjectMeta{
289+
Name: "node1",
290+
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
291+
},
292+
Spec: v1.NodeSpec{
293+
ProviderID: "i-0002",
294+
},
295+
}
296+
clientset := fake.NewSimpleClientset(testNode, testNode1)
297+
informer := informers.NewSharedInformerFactory(clientset, time.Second)
298+
nodeInformer := informer.Core().V1().Nodes()
299+
300+
if err := syncNodeStore(nodeInformer, clientset); err != nil {
301+
t.Errorf("unexpected error: %v", err)
302+
}
303+
304+
tc, err := NewTaggingController(nodeInformer, clientset, fakeAws, time.Second, nil, []string{}, 0, 0)
305+
if err != nil {
306+
t.Errorf("unexpected error: %v", err)
307+
}
308+
tc.enqueueNode(testNode, addTag)
309+
if tc.workqueue.Len() != 1 {
310+
t.Errorf("invalid work queue length, expected 1, got %d", tc.workqueue.Len())
311+
}
312+
// adding the same node with similar operation shouldn't add to the workqueue
313+
tc.enqueueNode(testNode, addTag)
314+
if tc.workqueue.Len() != 1 {
315+
t.Errorf("invalid work queue length, expected 1, got %d", tc.workqueue.Len())
316+
}
317+
// adding the same node with different operation should add to the workqueue
318+
tc.enqueueNode(testNode, deleteTag)
319+
if tc.workqueue.Len() != 2 {
320+
t.Errorf("invalid work queue length, expected 2, got %d", tc.workqueue.Len())
321+
}
322+
// adding the different node should add to the workqueue
323+
tc.enqueueNode(testNode1, addTag)
324+
if tc.workqueue.Len() != 3 {
325+
t.Errorf("invalid work queue length, expected 3, got %d", tc.workqueue.Len())
326+
}
327+
// should handle the add tag properly
328+
tc.process()
329+
if tc.workqueue.Len() != 2 {
330+
t.Errorf("invalid work queue length, expected 1, got %d", tc.workqueue.Len())
331+
}
332+
// should handle the delete tag properly
333+
tc.process()
334+
if tc.workqueue.Len() != 1 {
335+
t.Errorf("invalid work queue length, expected 1, got %d", tc.workqueue.Len())
336+
}
337+
}
338+
265339
func syncNodeStore(nodeinformer coreinformers.NodeInformer, f *fake.Clientset) error {
266340
nodes, err := f.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
267341
if err != nil {

0 commit comments

Comments
 (0)