Skip to content

[WIP] Drop v1.Node from workItem #1090

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 38 additions & 31 deletions pkg/controllers/tagging/tagging_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,15 @@ func init() {

// workItem contains the node and an action for that node
type workItem struct {
node *v1.Node
name string
providerID string
action func(node *v1.Node) error
requeuingCount int
enqueueTime time.Time
}

func (w workItem) String() string {
return fmt.Sprintf("[Node: %s, RequeuingCount: %d, EnqueueTime: %s]", w.node.GetName(), w.requeuingCount, w.enqueueTime)
return fmt.Sprintf("[Node: %s, RequeuingCount: %d, EnqueueTime: %s]", w.name, w.requeuingCount, w.enqueueTime)
}

const (
Expand Down Expand Up @@ -82,7 +83,7 @@ type Controller struct {
nodeInformer coreinformers.NodeInformer
kubeClient clientset.Interface
cloud *awsv1.Cloud
workqueue workqueue.RateLimitingInterface
workqueue workqueue.TypedRateLimitingInterface[*workItem]
nodesSynced cache.InformerSynced

// Value controlling Controller monitoring period, i.e. how often does Controller
Expand Down Expand Up @@ -116,20 +117,23 @@ func NewTaggingController(
return nil, err
}

var rateLimiter workqueue.TypedRateLimiter[any]
var rateLimiter workqueue.TypedRateLimiter[*workItem]
var rateLimitEnabled bool
if rateLimit > 0.0 && burstLimit > 0 {
klog.Infof("Rate limit enabled on controller with rate %f and burst %d.", rateLimit, burstLimit)

// This is the workqueue.DefaultControllerRateLimiter() but in case where throttling is enabled on the controller,
// the rate and burst values are set to the provided values.
rateLimiter = workqueue.NewTypedMaxOfRateLimiter(
workqueue.NewTypedItemExponentialFailureRateLimiter[any](5*time.Millisecond, 1000*time.Second),
&workqueue.TypedBucketRateLimiter[any]{Limiter: rate.NewLimiter(rate.Limit(rateLimit), burstLimit)},
rateLimiter = workqueue.NewTypedMaxOfRateLimiter[*workItem](
workqueue.NewTypedItemExponentialFailureRateLimiter[*workItem](5*time.Millisecond, 1000*time.Second),
// 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
&workqueue.TypedBucketRateLimiter[*workItem]{Limiter: rate.NewLimiter(rate.Limit(rateLimit), burstLimit)},
)

rateLimitEnabled = true
} else {
klog.Infof("Rate limit disabled on controller.")
rateLimiter = workqueue.DefaultTypedControllerRateLimiter[any]()
rateLimiter = workqueue.DefaultTypedControllerRateLimiter[*workItem]()
rateLimitEnabled = false
}

Expand All @@ -139,9 +143,10 @@ func NewTaggingController(
cloud: awsCloud,
tags: tags,
resources: resources,
workqueue: workqueue.NewTypedRateLimitingQueueWithConfig[any](rateLimiter, workqueue.TypedRateLimitingQueueConfig[any]{
Name: TaggingControllerClientName,
}),
workqueue: workqueue.NewTypedRateLimitingQueueWithConfig(
rateLimiter,
workqueue.TypedRateLimitingQueueConfig[*workItem]{Name: TaggingControllerClientName},
),
nodesSynced: nodeInformer.Informer().HasSynced,
nodeMonitorPeriod: nodeMonitorPeriod,
rateLimitEnabled: rateLimitEnabled,
Expand Down Expand Up @@ -205,31 +210,32 @@ func (tc *Controller) work() {
// process reads each message in the queue and performs either
// tag or untag function on the Node object
func (tc *Controller) process() bool {
obj, shutdown := tc.workqueue.Get()
item, shutdown := tc.workqueue.Get()
if shutdown {
return false
}

klog.Infof("Starting to process %s", obj)

err := func(obj interface{}) error {
defer tc.workqueue.Done(obj)
klog.Infof("Starting to process %s", item)
node, exists, err := tc.nodeInformer.Informer().GetIndexer().GetByKey(item.name)
if err != nil {
klog.Errorf("Error occurred while getting node from informer %s", item)
utilruntime.HandleError(err)
}
if !exists {
klog.Errorf("Error occurred node missing in informer %s", item)
return false
}

workItem, ok := obj.(*workItem)
if !ok {
tc.workqueue.Forget(obj)
err := fmt.Errorf("expected workItem in workqueue but got %s", obj)
utilruntime.HandleError(err)
return nil
}
err = func(workItem *workItem, node *v1.Node) error {
defer tc.workqueue.Done(workItem)

timeTaken := time.Since(workItem.enqueueTime).Seconds()
recordWorkItemLatencyMetrics(workItemDequeuingTimeWorkItemMetric, timeTaken)
klog.Infof("Dequeuing latency %f seconds", timeTaken)

instanceID, err := awsv1.KubernetesInstanceID(workItem.node.Spec.ProviderID).MapToAWSInstanceID()
instanceID, err := awsv1.KubernetesInstanceID(workItem.providerID).MapToAWSInstanceID()
if err != nil {
err = fmt.Errorf("Error in getting instanceID for node %s, error: %v", workItem.node.GetName(), err)
err = fmt.Errorf("Error in getting instanceID for node %s, error: %v", workItem.name, err)
utilruntime.HandleError(err)
return nil
}
Expand All @@ -238,11 +244,11 @@ func (tc *Controller) process() bool {
if variant.IsVariantNode(string(instanceID)) {
klog.Infof("Skip processing the node %s since it is a %s node",
instanceID, variant.NodeType(string(instanceID)))
tc.workqueue.Forget(obj)
tc.workqueue.Forget(workItem)
return nil
}

err = workItem.action(workItem.node)
err = workItem.action(node)

if err != nil {
if workItem.requeuingCount < maxRequeuingCount {
Expand All @@ -263,12 +269,12 @@ func (tc *Controller) process() bool {
klog.Infof("Processing latency %f seconds", timeTaken)
}

tc.workqueue.Forget(obj)
tc.workqueue.Forget(workItem)
return nil
}(obj)
}(item, node.(*v1.Node))

if err != nil {
klog.Errorf("Error occurred while processing %s", obj)
klog.Errorf("Error occurred while processing %s", item)
utilruntime.HandleError(err)
}

Expand Down Expand Up @@ -369,7 +375,8 @@ func (tc *Controller) untagEc2Instance(node *v1.Node) error {
// action for the object for a workitem and enqueue to the workqueue
func (tc *Controller) enqueueNode(node *v1.Node, action func(node *v1.Node) error) {
item := &workItem{
node: node,
name: node.GetName(),
providerID: node.Spec.ProviderID,
action: action,
requeuingCount: 0,
enqueueTime: time.Now(),
Expand Down
9 changes: 6 additions & 3 deletions pkg/controllers/tagging/tagging_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func Test_NodesJoiningAndLeaving(t *testing.T) {
klog.SetOutput(os.Stderr)
}()

clientset := fake.NewSimpleClientset(testcase.currNode)
clientset := fake.NewClientset(testcase.currNode)
informer := informers.NewSharedInformerFactory(clientset, time.Second)
nodeInformer := informer.Core().V1().Nodes()

Expand All @@ -221,8 +221,11 @@ func Test_NodesJoiningAndLeaving(t *testing.T) {
nodeMonitorPeriod: 1 * time.Second,
tags: map[string]string{"key2": "value2", "key1": "value1"},
resources: []string{"instance"},
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Tagging"),
rateLimitEnabled: testcase.rateLimited,
workqueue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[*workItem](),
workqueue.TypedRateLimitingQueueConfig[*workItem]{Name: "Tagging"},
),
rateLimitEnabled: testcase.rateLimited,
}

if testcase.toBeTagged {
Expand Down
Loading