From 223cbfdf808cc4fc00c67d3b70a39f9f8e0cceab Mon Sep 17 00:00:00 2001 From: Jason Deal Date: Tue, 9 Apr 2024 20:01:14 -0700 Subject: [PATCH] fix: address disruption taint race condition --- pkg/controllers/disruption/controller.go | 51 +++++++++++++++++++++--- 1 file changed, 45 insertions(+), 6 deletions(-) diff --git a/pkg/controllers/disruption/controller.go b/pkg/controllers/disruption/controller.go index b3c33a6c7d..c1d1353dff 100644 --- a/pkg/controllers/disruption/controller.go +++ b/pkg/controllers/disruption/controller.go @@ -58,8 +58,11 @@ type Controller struct { lastRun map[string]time.Time } -// pollingPeriod that we inspect cluster to look for opportunities to disrupt -const pollingPeriod = 10 * time.Second +const ( + // pollingPeriod that we inspect cluster to look for opportunities to disrupt + pollingPeriod = 10 * time.Second + commandExecutionDelay = 10 * time.Second +) var errCandidateDeleting = fmt.Errorf("candidate is deleting") @@ -171,7 +174,7 @@ func (c *Controller) disrupt(ctx context.Context, disruption Method) (bool, erro } // Attempt to disrupt - if err := c.executeCommand(ctx, disruption, cmd, schedulingResults); err != nil { + if err := c.executeCommand(ctx, disruption, cmd, schedulingResults, disruption.ShouldDisrupt); err != nil { return false, fmt.Errorf("disrupting candidates, %w", err) } @@ -182,7 +185,9 @@ func (c *Controller) disrupt(ctx context.Context, disruption Method) (bool, erro // 1. Taint candidate nodes // 2. Spin up replacement nodes // 3. Add Command to orchestration.Queue to wait to delete the candiates. -func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command, schedulingResults scheduling.Results) error { +// +//nolint:gocyclo +func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command, schedulingResults scheduling.Results, candidateFilter CandidateFilter) error { commandID := uuid.NewUUID() logging.FromContext(ctx).With("command-id", commandID).Infof("disrupting via %s %s", m.Type(), cmd) @@ -191,11 +196,45 @@ func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command, }) // Cordon the old nodes before we launch the replacements to prevent new pods from scheduling to the old nodes if err := state.RequireNoScheduleTaint(ctx, c.kubeClient, true, stateNodes...); err != nil { - return multierr.Append(fmt.Errorf("tainting nodes (command-id: %s), %w", commandID, err), state.RequireNoScheduleTaint(ctx, c.kubeClient, false, stateNodes...)) + return multierr.Append( + fmt.Errorf("tainting nodes (command-id: %s), %w", commandID, err), + state.RequireNoScheduleTaint(ctx, c.kubeClient, false, stateNodes...), + ) + } + + // After adding the NoSchedule taint to the proposed candidates, revalidate the candidates after a waiting period. This addresses a race condition + // where pods with the `karpenter.sh/do-not-disrupt` annotation can be scheduled after the command has been validated but before the nodes have been tainted. + // A waiting period is used to ensure Karpenter's internal cluster state has synced and all pods are accounted for. + select { + case <-ctx.Done(): + return multierr.Append(fmt.Errorf("interrupted (command-id: %s)", commandID), state.RequireNoScheduleTaint(ctx, c.kubeClient, false, stateNodes...)) + case <-c.clock.After(commandExecutionDelay): + } + + currentCandidates, err := GetCandidates(ctx, c.cluster, c.kubeClient, c.recorder, c.clock, c.cloudProvider, candidateFilter, c.queue) + if err != nil { + return multierr.Append(fmt.Errorf("validating candidates (command-id: %s), %w", commandID, err), state.RequireNoScheduleTaint(ctx, c.kubeClient, false, stateNodes...)) + } + currentCandidates = mapCandidates(cmd.candidates, currentCandidates) + if len(currentCandidates) != len(cmd.candidates) { + logging.FromContext(ctx).With("command-id", commandID).Infof("abandoning disruption attempt, %d out of %d candidates are no longer valid", len(cmd.candidates)-len(currentCandidates), len(cmd.candidates)) + return state.RequireNoScheduleTaint(ctx, c.kubeClient, false, stateNodes...) + } + + // Rebuild the disruption budget mapping to see if any budgets have changed since revalidation. + currentBudgetMapping, err := BuildDisruptionBudgets(ctx, c.cluster, c.clock, c.kubeClient, c.recorder) + if err != nil { + return multierr.Append(fmt.Errorf("building disruption budgets, %w", err), state.RequireNoScheduleTaint(ctx, c.kubeClient, false, stateNodes...)) + } + for _, candidate := range currentCandidates { + if currentBudgetMapping[candidate.nodePool.Name] == 0 { + logging.FromContext(ctx).With("command-id", commandID).Infof("abandoning disruption attempt, no longer allowed by disruption budgets", len(cmd.candidates)-len(currentCandidates), len(cmd.candidates)) + return state.RequireNoScheduleTaint(ctx, c.kubeClient, false, stateNodes...) + } + currentBudgetMapping[candidate.nodePool.Name]-- } var nodeClaimNames []string - var err error if len(cmd.replacements) > 0 { if nodeClaimNames, err = c.createReplacementNodeClaims(ctx, m, cmd); err != nil { // If we failed to launch the replacement, don't disrupt. If this is some permanent failure,