Skip to content

Commit

Permalink
fix: address disruption taint race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
jmdeal committed Apr 10, 2024
1 parent 43da360 commit 223cbfd
Showing 1 changed file with 45 additions and 6 deletions.
51 changes: 45 additions & 6 deletions pkg/controllers/disruption/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,11 @@ type Controller struct {
lastRun map[string]time.Time
}

// pollingPeriod that we inspect cluster to look for opportunities to disrupt
const pollingPeriod = 10 * time.Second
const (
// pollingPeriod that we inspect cluster to look for opportunities to disrupt
pollingPeriod = 10 * time.Second
commandExecutionDelay = 10 * time.Second
)

var errCandidateDeleting = fmt.Errorf("candidate is deleting")

Expand Down Expand Up @@ -171,7 +174,7 @@ func (c *Controller) disrupt(ctx context.Context, disruption Method) (bool, erro
}

// Attempt to disrupt
if err := c.executeCommand(ctx, disruption, cmd, schedulingResults); err != nil {
if err := c.executeCommand(ctx, disruption, cmd, schedulingResults, disruption.ShouldDisrupt); err != nil {
return false, fmt.Errorf("disrupting candidates, %w", err)
}

Expand All @@ -182,7 +185,9 @@ func (c *Controller) disrupt(ctx context.Context, disruption Method) (bool, erro
// 1. Taint candidate nodes
// 2. Spin up replacement nodes
// 3. Add Command to orchestration.Queue to wait to delete the candiates.
func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command, schedulingResults scheduling.Results) error {
//
//nolint:gocyclo
func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command, schedulingResults scheduling.Results, candidateFilter CandidateFilter) error {
commandID := uuid.NewUUID()
logging.FromContext(ctx).With("command-id", commandID).Infof("disrupting via %s %s", m.Type(), cmd)

Expand All @@ -191,11 +196,45 @@ func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command,
})
// Cordon the old nodes before we launch the replacements to prevent new pods from scheduling to the old nodes
if err := state.RequireNoScheduleTaint(ctx, c.kubeClient, true, stateNodes...); err != nil {
return multierr.Append(fmt.Errorf("tainting nodes (command-id: %s), %w", commandID, err), state.RequireNoScheduleTaint(ctx, c.kubeClient, false, stateNodes...))
return multierr.Append(
fmt.Errorf("tainting nodes (command-id: %s), %w", commandID, err),
state.RequireNoScheduleTaint(ctx, c.kubeClient, false, stateNodes...),
)
}

// After adding the NoSchedule taint to the proposed candidates, revalidate the candidates after a waiting period. This addresses a race condition
// where pods with the `karpenter.sh/do-not-disrupt` annotation can be scheduled after the command has been validated but before the nodes have been tainted.
// A waiting period is used to ensure Karpenter's internal cluster state has synced and all pods are accounted for.
select {
case <-ctx.Done():
return multierr.Append(fmt.Errorf("interrupted (command-id: %s)", commandID), state.RequireNoScheduleTaint(ctx, c.kubeClient, false, stateNodes...))
case <-c.clock.After(commandExecutionDelay):
}

currentCandidates, err := GetCandidates(ctx, c.cluster, c.kubeClient, c.recorder, c.clock, c.cloudProvider, candidateFilter, c.queue)
if err != nil {
return multierr.Append(fmt.Errorf("validating candidates (command-id: %s), %w", commandID, err), state.RequireNoScheduleTaint(ctx, c.kubeClient, false, stateNodes...))
}
currentCandidates = mapCandidates(cmd.candidates, currentCandidates)
if len(currentCandidates) != len(cmd.candidates) {
logging.FromContext(ctx).With("command-id", commandID).Infof("abandoning disruption attempt, %d out of %d candidates are no longer valid", len(cmd.candidates)-len(currentCandidates), len(cmd.candidates))
return state.RequireNoScheduleTaint(ctx, c.kubeClient, false, stateNodes...)
}

// Rebuild the disruption budget mapping to see if any budgets have changed since revalidation.
currentBudgetMapping, err := BuildDisruptionBudgets(ctx, c.cluster, c.clock, c.kubeClient, c.recorder)
if err != nil {
return multierr.Append(fmt.Errorf("building disruption budgets, %w", err), state.RequireNoScheduleTaint(ctx, c.kubeClient, false, stateNodes...))
}
for _, candidate := range currentCandidates {
if currentBudgetMapping[candidate.nodePool.Name] == 0 {
logging.FromContext(ctx).With("command-id", commandID).Infof("abandoning disruption attempt, no longer allowed by disruption budgets", len(cmd.candidates)-len(currentCandidates), len(cmd.candidates))
return state.RequireNoScheduleTaint(ctx, c.kubeClient, false, stateNodes...)
}
currentBudgetMapping[candidate.nodePool.Name]--
}

var nodeClaimNames []string
var err error
if len(cmd.replacements) > 0 {
if nodeClaimNames, err = c.createReplacementNodeClaims(ctx, m, cmd); err != nil {
// If we failed to launch the replacement, don't disrupt. If this is some permanent failure,
Expand Down

0 comments on commit 223cbfd

Please sign in to comment.