Skip to content

Commit

Permalink
[node-labeler] Introduce workspace count controller (#20509)
Browse files Browse the repository at this point in the history
* [ws-daemon] Introduce pod count controller

* remove unnecessary variable

* move to `node-labeler`

* act on ws crds

* Fix runtime not filled in yet

* Make tests pass!

* Improve test file structure

* Fix `node-labeler:lib` build

* Remove unnecessary changes

* Address some review comments (thanks, kyle!)

* Try caching?

* Queue deleted nodes and periodically reconcile it all

* WCC cleanup function

* Fix tests

* Update name

* Add metrics for controller

* Add synchronization for node reconciliation to prevent race conditions

* Address review comments

* Remove superflous log

* Remove unneeded metrics and add cool log line

* big yellow warning for a thing that should not happen
  • Loading branch information
filiptronicek authored Jan 21, 2025
1 parent 76781bf commit 4d3cca4
Show file tree
Hide file tree
Showing 8 changed files with 590 additions and 13 deletions.
10 changes: 10 additions & 0 deletions components/node-labeler/BUILD.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@ packages:
- "**/*.go"
- "go.mod"
- "go.sum"
- "crd/*.yaml"
deps:
- components/common-go:lib
- components/ws-manager-api/go:lib
- components/ws-manager-mk2:crd
env:
- CGO_ENABLED=0
- GOOS=linux
prep:
- ["mv", "_deps/components-ws-manager-mk2--crd/workspace.gitpod.io_workspaces.yaml", "crd/workspace.gitpod.io_workspaces.yaml"]
config:
packaging: app
buildCommand: ["go", "build", "-trimpath", "-ldflags", "-buildid= -w -s -X 'github.com/gitpod-io/gitpod/node-labeler/cmd.Version=commit-${__git_commit}'"]
Expand All @@ -34,5 +39,10 @@ packages:
- "**/*.go"
- "go.mod"
- "go.sum"
- "crd/*.yaml"
deps:
- components/common-go:lib
- components/ws-manager-api/go:lib
- components/ws-manager-mk2:crd
prep:
- ["mv", "_deps/components-ws-manager-mk2--crd/workspace.gitpod.io_workspaces.yaml", "crd/workspace.gitpod.io_workspaces.yaml"]
2 changes: 1 addition & 1 deletion components/node-labeler/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var (
// rootCmd represents the base command when called without any subcommands
var rootCmd = &cobra.Command{
Use: ServiceName,
Short: "node-labeler is in charge of maintining the node labels that workspaces require to run in a node",
Short: "node-labeler is in charge of maintaining the node labels that workspaces require to run in a node",
PersistentPreRun: func(cmd *cobra.Command, args []string) {
log.Init(ServiceName, Version, jsonLog, verbose)
},
Expand Down
246 changes: 241 additions & 5 deletions components/node-labeler/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/bombsimon/logrusr/v2"
workspacev1 "github.com/gitpod-io/gitpod/ws-manager/api/crd/v1"
"github.com/spf13/cobra"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -31,7 +32,9 @@ import (
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/metrics"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/controller-runtime/pkg/predicate"
Expand Down Expand Up @@ -78,16 +81,16 @@ var runCmd = &cobra.Command{
LeaderElectionID: "node-labeler.gitpod.io",
})
if err != nil {
log.WithError(err).Fatal("unable to start node-labeber")
log.WithError(err).Fatal("unable to start node-labeler")
}

client, err := client.New(ctrl.GetConfigOrDie(), client.Options{})
kClient, err := client.New(ctrl.GetConfigOrDie(), client.Options{})
if err != nil {
log.WithError(err).Fatal("unable to create client")
}

r := &PodReconciler{
client,
kClient,
}

componentPredicate, err := predicate.LabelSelectorPredicate(metav1.LabelSelector{
Expand All @@ -110,6 +113,36 @@ var runCmd = &cobra.Command{
log.WithError(err).Fatal("unable to bind controller watch event handler")
}

if err := mgr.GetFieldIndexer().IndexField(context.Background(), &workspacev1.Workspace{}, "status.runtime.nodeName", func(o client.Object) []string {
ws := o.(*workspacev1.Workspace)
if ws.Status.Runtime == nil {
return nil
}
return []string{ws.Status.Runtime.NodeName}
}); err != nil {
log.WithError(err).Fatal("unable to create workspace indexer")
return
}

nsac, err := NewNodeScaledownAnnotationController(mgr.GetClient())
if err != nil {
log.WithError(err).Fatal("unable to create node scaledown annotation controller")
}
err = nsac.SetupWithManager(mgr)
if err != nil {
log.WithError(err).Fatal("unable to bind node scaledown annotation controller")
}

err = mgr.Add(manager.RunnableFunc(func(ctx context.Context) error {
<-ctx.Done()
log.Info("Received shutdown signal - stopping NodeScaledownAnnotationController")
nsac.Stop()
return nil
}))
if err != nil {
log.WithError(err).Fatal("couldn't properly clean up node scaledown annotation controller")
}

metrics.Registry.MustRegister(NodeLabelerCounterVec)
metrics.Registry.MustRegister(NodeLabelerTimeHistVec)

Expand All @@ -123,10 +156,10 @@ var runCmd = &cobra.Command{
log.WithError(err).Fatal("unable to set up ready check")
}

log.Info("starting node-labeber")
log.Info("starting node-labeler")
err = mgr.Start(ctrl.SetupSignalHandler())
if err != nil {
log.WithError(err).Fatal("problem running node-labeber")
log.WithError(err).Fatal("problem running node-labeler")
}

log.Info("Received SIGINT - shutting down")
Expand All @@ -135,6 +168,8 @@ var runCmd = &cobra.Command{

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(workspacev1.AddToScheme(scheme))

rootCmd.AddCommand(runCmd)
}

Expand Down Expand Up @@ -249,6 +284,207 @@ func (r *PodReconciler) Reconcile(ctx context.Context, req reconcile.Request) (r
return reconcile.Result{}, nil
}

type NodeScaledownAnnotationController struct {
client.Client
nodesToReconcile chan string
stopChan chan struct{}
}

func NewNodeScaledownAnnotationController(client client.Client) (*NodeScaledownAnnotationController, error) {
controller := &NodeScaledownAnnotationController{
Client: client,
nodesToReconcile: make(chan string, 1000),
stopChan: make(chan struct{}),
}

return controller, nil
}

func (c *NodeScaledownAnnotationController) SetupWithManager(mgr ctrl.Manager) error {
go c.reconciliationWorker()
go c.periodicReconciliation()

return ctrl.NewControllerManagedBy(mgr).
Named("node-scaledown-annotation-controller").
For(&workspacev1.Workspace{}).
WithEventFilter(c.workspaceFilter()).
Complete(c)
}

// periodicReconciliation periodically reconciles all nodes in the cluster
func (c *NodeScaledownAnnotationController) periodicReconciliation() {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()

for {
select {
case <-ticker.C:
log.Info("starting periodic full reconciliation")
ctx := context.Background()
if _, err := c.reconcileAllNodes(ctx); err != nil {
log.WithError(err).Error("periodic reconciliation failed")
}
case <-c.stopChan:
log.Info("stopping periodic full reconciliation")
return
}
}
}

// reconciliationWorker consumes nodesToReconcile and reconciles each node
func (c *NodeScaledownAnnotationController) reconciliationWorker() {
log.Info("reconciliation worker started")
for {
select {
case nodeName := <-c.nodesToReconcile:
ctx := context.Background()
if err := c.reconcileNode(ctx, nodeName); err != nil {
log.WithError(err).WithField("node", nodeName).Error("failed to reconcile node from queue")
}
case <-c.stopChan:
log.Info("reconciliation worker stopping")
return
}
}
}

func (c *NodeScaledownAnnotationController) workspaceFilter() predicate.Predicate {
return predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
ws := e.Object.(*workspacev1.Workspace)
if ws.Status.Runtime == nil {
log.WithField("workspace", ws.Name).Info("workspace not ready yet")
return false
}

return ws.Status.Runtime != nil && ws.Status.Runtime.NodeName != ""
},
UpdateFunc: func(e event.UpdateEvent) bool {
wsOld := e.ObjectOld.(*workspacev1.Workspace)
ws := e.ObjectNew.(*workspacev1.Workspace)
// if we haven't seen runtime info before and now it's there, let's reconcile.
// similarly, if the node name changed, we need to reconcile the old node as well.
if (wsOld.Status.Runtime == nil && ws.Status.Runtime != nil && ws.Status.Runtime.NodeName != "") || // we just got runtime info
(wsOld.Status.Runtime != nil && ws.Status.Runtime != nil && wsOld.Status.Runtime.NodeName != ws.Status.Runtime.NodeName) { // node name changed
if wsOld.Status.Runtime != nil && wsOld.Status.Runtime.NodeName != "" {
c.queueNodeForReconciliation(wsOld.Status.Runtime.NodeName)
}
return true
}

return false
},
DeleteFunc: func(e event.DeleteEvent) bool {
ws := e.Object.(*workspacev1.Workspace)
if ws.Status.Runtime != nil && ws.Status.Runtime.NodeName != "" {
c.queueNodeForReconciliation(ws.Status.Runtime.NodeName)
return true
}
return false
},
}
}

func (c *NodeScaledownAnnotationController) queueNodeForReconciliation(nodeName string) {
select {
case c.nodesToReconcile <- nodeName:
log.WithField("node", nodeName).Info("queued node for reconciliation")
default:
log.WithField("node", nodeName).Warn("reconciliation queue full")
}
}

func (c *NodeScaledownAnnotationController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log.WithField("request", req.NamespacedName.String()).Info("WorkspaceCountController reconciling")

var ws workspacev1.Workspace
if err := c.Get(ctx, req.NamespacedName, &ws); err != nil {
if !errors.IsNotFound(err) {
log.WithError(err).WithField("workspace", req.NamespacedName).Error("unable to fetch Workspace")
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}

if ws.Status.Runtime != nil && ws.Status.Runtime.NodeName != "" {
c.queueNodeForReconciliation(ws.Status.Runtime.NodeName)
}

log.WithField("runtime", ws.Status.Runtime).Warn("reconciling object with no Runtime/NodeName, which wasn't filtered out by workspaceFilter")
return ctrl.Result{}, nil
}

// Cleanup method to be called when shutting down the controller
func (wc *NodeScaledownAnnotationController) Stop() {
close(wc.stopChan)
}

func (c *NodeScaledownAnnotationController) reconcileAllNodes(ctx context.Context) (ctrl.Result, error) {
var nodes corev1.NodeList
if err := c.List(ctx, &nodes); err != nil {
log.WithError(err).Error("failed to list nodes")
return ctrl.Result{}, err
}

for _, node := range nodes.Items {
c.queueNodeForReconciliation(node.Name)
}

return ctrl.Result{}, nil
}

func (c *NodeScaledownAnnotationController) reconcileNode(ctx context.Context, nodeName string) error {
var workspaceList workspacev1.WorkspaceList
if err := c.List(ctx, &workspaceList, client.MatchingFields{
"status.runtime.nodeName": nodeName,
}); err != nil {
return fmt.Errorf("failed to list workspaces: %w", err)
}

log.WithField("node", nodeName).WithField("count", len(workspaceList.Items)).Info("acting on workspaces")
count := len(workspaceList.Items)

return c.updateNodeAnnotation(ctx, nodeName, count)
}

func (c *NodeScaledownAnnotationController) updateNodeAnnotation(ctx context.Context, nodeName string, count int) error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

var node corev1.Node
err := c.Get(ctx, types.NamespacedName{Name: nodeName}, &node)
if err != nil {
return fmt.Errorf("obtaining node %s: %w", nodeName, err)
}

shouldDisableScaleDown := count > 0
currentlyDisabled := false
if val, exists := node.Annotations["cluster-autoscaler.kubernetes.io/scale-down-disabled"]; exists {
currentlyDisabled = val == "true"
}

// Only update if the state needs to change
if shouldDisableScaleDown != currentlyDisabled {
if node.Annotations == nil {
node.Annotations = make(map[string]string)
}

if shouldDisableScaleDown {
node.Annotations["cluster-autoscaler.kubernetes.io/scale-down-disabled"] = "true"
log.WithField("nodeName", nodeName).Info("disabling scale-down for node")
} else {
delete(node.Annotations, "cluster-autoscaler.kubernetes.io/scale-down-disabled")
log.WithField("nodeName", nodeName).Info("enabling scale-down for node")
}

return c.Update(ctx, &node)
}

return nil
})
}

func updateLabel(label string, add bool, nodeName string, client client.Client) error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
Expand Down
Loading

0 comments on commit 4d3cca4

Please sign in to comment.