-
Notifications
You must be signed in to change notification settings - Fork 117
/
Copy pathsolr_cluster_ops_util.go
681 lines (630 loc) · 31 KB
/
solr_cluster_ops_util.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package controllers
import (
"context"
"encoding/json"
"errors"
solrv1beta1 "github.com/apache/solr-operator/api/v1beta1"
"github.com/apache/solr-operator/controllers/util"
"github.com/apache/solr-operator/controllers/util/solr_api"
"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"
"net/url"
"sigs.k8s.io/controller-runtime/pkg/client"
"strconv"
"time"
)
// SolrClusterOp contains metadata for cluster operations performed on SolrClouds.
type SolrClusterOp struct {
// The type of Cluster Operation
Operation SolrClusterOperationType `json:"operation"`
// Time that the Cluster Operation was started or re-started
LastStartTime metav1.Time `json:"lastStartTime"`
// Time that the Cluster Operation was started or re-started
Metadata string `json:"metadata"`
}
type SolrClusterOperationType string
const (
ScaleDownLock SolrClusterOperationType = "ScalingDown"
ScaleUpLock SolrClusterOperationType = "ScalingUp"
UpdateLock SolrClusterOperationType = "RollingUpdate"
BalanceReplicasLock SolrClusterOperationType = "BalanceReplicas"
PvcExpansionLock SolrClusterOperationType = "PVCExpansion"
)
// RollingUpdateMetadata contains metadata for rolling update cluster operations.
type RollingUpdateMetadata struct {
// Whether or not replicas will be migrated during this rolling upgrade
RequiresReplicaMigration bool `json:"requiresReplicaMigration"`
}
func clearClusterOpLock(statefulSet *appsv1.StatefulSet) {
delete(statefulSet.Annotations, util.ClusterOpsLockAnnotation)
}
func setClusterOpLock(statefulSet *appsv1.StatefulSet, op SolrClusterOp) error {
op.LastStartTime = metav1.Now()
bytes, err := json.Marshal(op)
if err != nil {
return err
}
statefulSet.Annotations[util.ClusterOpsLockAnnotation] = string(bytes)
return nil
}
func setClusterOpRetryQueue(statefulSet *appsv1.StatefulSet, queue []SolrClusterOp) error {
if len(queue) > 0 {
bytes, err := json.Marshal(queue)
if err != nil {
return err
}
statefulSet.Annotations[util.ClusterOpsRetryQueueAnnotation] = string(bytes)
} else {
delete(statefulSet.Annotations, util.ClusterOpsRetryQueueAnnotation)
}
return nil
}
func GetCurrentClusterOp(statefulSet *appsv1.StatefulSet) (clusterOp *SolrClusterOp, err error) {
if op, hasOp := statefulSet.Annotations[util.ClusterOpsLockAnnotation]; hasOp {
clusterOp = &SolrClusterOp{}
err = json.Unmarshal([]byte(op), clusterOp)
}
return
}
func GetClusterOpRetryQueue(statefulSet *appsv1.StatefulSet) (clusterOpQueue []SolrClusterOp, err error) {
if op, hasOp := statefulSet.Annotations[util.ClusterOpsRetryQueueAnnotation]; hasOp {
err = json.Unmarshal([]byte(op), &clusterOpQueue)
}
return
}
func enqueueCurrentClusterOpForRetry(statefulSet *appsv1.StatefulSet) (hasOp bool, err error) {
clusterOp, err := GetCurrentClusterOp(statefulSet)
if err != nil || clusterOp == nil {
return false, err
}
clusterOpRetryQueue, err := GetClusterOpRetryQueue(statefulSet)
if err != nil {
return true, err
}
clusterOpRetryQueue = append(clusterOpRetryQueue, *clusterOp)
clearClusterOpLock(statefulSet)
return true, setClusterOpRetryQueue(statefulSet, clusterOpRetryQueue)
}
func retryNextQueuedClusterOp(statefulSet *appsv1.StatefulSet) (hasOp bool, err error) {
clusterOpRetryQueue, err := GetClusterOpRetryQueue(statefulSet)
if err != nil {
return hasOp, err
}
hasOp = len(clusterOpRetryQueue) > 0
if len(clusterOpRetryQueue) > 0 {
nextOp := clusterOpRetryQueue[0]
err = setClusterOpLock(statefulSet, nextOp)
if err != nil {
return hasOp, err
}
err = setClusterOpRetryQueue(statefulSet, clusterOpRetryQueue[1:])
}
return hasOp, err
}
func retryNextQueuedClusterOpWithQueue(statefulSet *appsv1.StatefulSet, clusterOpQueue []SolrClusterOp) (hasOp bool, err error) {
if err != nil {
return hasOp, err
}
hasOp = len(clusterOpQueue) > 0
if len(clusterOpQueue) > 0 {
nextOp := clusterOpQueue[0]
err = setClusterOpLock(statefulSet, nextOp)
if err != nil {
return hasOp, err
}
err = setClusterOpRetryQueue(statefulSet, clusterOpQueue[1:])
}
return hasOp, err
}
func determinePvcExpansionClusterOpLockIfNecessary(instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet) (clusterOp *SolrClusterOp, retryLaterDuration time.Duration, err error) {
if instance.Spec.StorageOptions.PersistentStorage != nil &&
instance.Spec.StorageOptions.PersistentStorage.PersistentVolumeClaimTemplate.Spec.Resources.Requests.Storage() != nil &&
instance.Spec.StorageOptions.PersistentStorage.PersistentVolumeClaimTemplate.Spec.Resources.Requests.Storage().String() != statefulSet.Annotations[util.StorageMinimumSizeAnnotation] {
// First make sure that the new Storage request is greater than what already is set.
// PVCs cannot be shrunk
newSize := instance.Spec.StorageOptions.PersistentStorage.PersistentVolumeClaimTemplate.Spec.Resources.Requests.Storage()
// If there is no old size to update, the StatefulSet can be just set to use the new PVC size without any issue.
// Only do a cluster operation if we are expanding from an existing size to a new size
if oldSizeStr, hasOldSize := statefulSet.Annotations[util.StorageMinimumSizeAnnotation]; hasOldSize {
if oldSize, e := resource.ParseQuantity(oldSizeStr); e != nil {
err = e
// TODO: add an event
} else {
// Only update to the new size if it is bigger, we cannot shrink PVCs
if newSize.Cmp(oldSize) > 0 {
clusterOp = &SolrClusterOp{
Operation: PvcExpansionLock,
Metadata: newSize.String(),
}
}
// TODO: add an event saying that we cannot shrink PVCs
}
}
}
return
}
// handleManagedCloudScaleUp does the logic of a managed and "locked" cloud scale up operation.
// This will likely take many reconcile loops to complete, as it is moving replicas to the pods that have recently been scaled up.
func handlePvcExpansion(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, clusterOp *SolrClusterOp, logger logr.Logger) (operationComplete bool, retryLaterDuration time.Duration, err error) {
var newSize resource.Quantity
newSize, err = resource.ParseQuantity(clusterOp.Metadata)
if err != nil {
logger.Error(err, "Could not convert PvcExpansion metadata to a resource.Quantity, as it represents the new size of PVCs", "metadata", clusterOp.Metadata)
return
}
operationComplete, err = r.expandPVCs(ctx, instance, statefulSet.Spec.Selector.MatchLabels, newSize, logger)
if err == nil && operationComplete {
originalStatefulSet := statefulSet.DeepCopy()
statefulSet.Annotations[util.StorageMinimumSizeAnnotation] = newSize.String()
statefulSet.Spec.Template.Annotations[util.StorageMinimumSizeAnnotation] = newSize.String()
if err = r.Patch(ctx, statefulSet, client.StrategicMergeFrom(originalStatefulSet)); err != nil {
logger.Error(err, "Error while patching StatefulSet to set the new minimum PVC size after PVCs the completion of PVC resizing", "newSize", newSize)
operationComplete = false
}
// Return and wait for the StatefulSet to be updated which will call the reconcile to start the rolling restart
retryLaterDuration = 0
} else if err == nil {
retryLaterDuration = time.Second * 5
}
return
}
func determineScaleClusterOpLockIfNecessary(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, scaleDownOpIsQueued bool, podList []corev1.Pod, blockReconciliationOfStatefulSet bool, logger logr.Logger) (clusterOp *SolrClusterOp, retryLaterDuration time.Duration, err error) {
desiredPods := int(*instance.Spec.Replicas)
configuredPods := int(*statefulSet.Spec.Replicas)
if desiredPods != configuredPods {
// We do not do a "managed" scale-to-zero operation.
// Only do a managed scale down if the desiredPods is positive.
// The VacatePodsOnScaleDown option is enabled by default, so treat "nil" like "true"
if desiredPods < configuredPods && desiredPods > 0 &&
(instance.Spec.Scaling.VacatePodsOnScaleDown == nil || *instance.Spec.Scaling.VacatePodsOnScaleDown) {
if len(podList) > configuredPods {
// There are too many pods, the statefulSet controller has yet to delete unwanted pods.
// Do not start the scale down until these extra pods are deleted.
return nil, time.Second * 5, nil
}
clusterOp = &SolrClusterOp{
Operation: ScaleDownLock,
Metadata: strconv.Itoa(configuredPods - 1),
}
} else if desiredPods > configuredPods && (instance.Spec.Scaling.PopulatePodsOnScaleUp == nil || *instance.Spec.Scaling.PopulatePodsOnScaleUp) {
// We need to wait for all pods to become healthy to scale up in a managed fashion, otherwise
// the balancing will skip some pods
if len(podList) < configuredPods {
// There are not enough pods, the statefulSet controller has yet to create the previously desired pods.
// Do not start the scale up until these missing pods are created.
return nil, time.Second * 5, nil
}
// If Solr nodes are advertised by their individual node services (through an ingress)
// then make sure that the host aliases are set for all desired pods before starting a scale-up.
// If the host aliases do not already include the soon-to-be created pods, then Solr might not be able to balance
// replicas onto the new hosts.
// We need to make sure that the StatefulSet is updated with these new hostAliases before the scale up occurs.
if instance.UsesIndividualNodeServices() && instance.Spec.SolrAddressability.External.UseExternalAddress {
for _, pod := range podList {
if len(pod.Spec.HostAliases) < desiredPods {
return nil, time.Second * 5, nil
}
}
}
clusterOp = &SolrClusterOp{
Operation: ScaleUpLock,
Metadata: strconv.Itoa(desiredPods),
}
} else {
err = scaleCloudUnmanaged(ctx, r, statefulSet, desiredPods, logger)
}
} else if scaleDownOpIsQueued {
// If the statefulSet and the solrCloud have the same number of pods configured, and the queued operation is a scaleDown,
// that means the scaleDown was reverted. So there's no reason to change the number of pods.
// However, a Replica Balancing should be done just in case, so start it via a new ClusterOperation.
clusterOp = &SolrClusterOp{
Operation: BalanceReplicasLock,
Metadata: "UndoFailedScaleDown",
}
}
return
}
// handleManagedCloudScaleDown does the logic of a managed and "locked" cloud scale down operation.
// This will likely take many reconcile loops to complete, as it is moving replicas away from the pods that will be scaled down.
func handleManagedCloudScaleDown(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, clusterOp *SolrClusterOp, podList []corev1.Pod, logger logr.Logger) (operationComplete bool, requestInProgress bool, retryLaterDuration time.Duration, err error) {
var scaleDownTo int
if scaleDownTo, err = strconv.Atoi(clusterOp.Metadata); err != nil {
logger.Error(err, "Could not convert ScaleDown metadata to int, as it represents the number of nodes to scale to", "metadata", clusterOp.Metadata)
return
// TODO: Create event for the CRD.
}
if len(podList) <= scaleDownTo {
// The number of pods is less than we are trying to scaleDown to, so we are done
return true, false, 0, nil
}
if int(*statefulSet.Spec.Replicas) <= scaleDownTo {
// We've done everything we need to do at this point. We just need to wait until the pods are deleted to be "done".
// So return and wait for the next reconcile loop, whenever it happens
return false, false, time.Second, nil
}
// TODO: It would be great to support a multi-node scale down when Solr supports evicting many SolrNodes at once.
if int(*statefulSet.Spec.Replicas) > scaleDownTo+1 {
// This shouldn't happen, but we don't want to be stuck if it does.
// Just remove the cluster Op, because the cluster is bigger than it should be.
// We will retry the whole thing again, with the right metadata this time
operationComplete = true
return true, false, time.Second, nil
}
// Before doing anything to the pod, make sure that users cannot send requests to the pod anymore.
podStoppedReadinessConditions := map[corev1.PodConditionType]podReadinessConditionChange{
util.SolrIsNotStoppedReadinessCondition: {
reason: ScaleDown,
message: "Pod is being deleted, traffic to the pod must be stopped",
status: false,
},
}
// Only evict the last pod, even if we are trying to scale down multiple pods.
// Scale down will happen one pod at a time.
var replicaManagementComplete bool
if replicaManagementComplete, requestInProgress, err = evictSinglePod(ctx, r, instance, scaleDownTo, podList, podStoppedReadinessConditions, logger); err == nil {
if replicaManagementComplete {
originalStatefulSet := statefulSet.DeepCopy()
statefulSet.Spec.Replicas = pointer.Int32(int32(scaleDownTo))
if err = r.Patch(ctx, statefulSet, client.StrategicMergeFrom(originalStatefulSet)); err != nil {
logger.Error(err, "Error while patching StatefulSet to scale down pods after eviction", "newStatefulSetReplicas", scaleDownTo)
}
// Return and wait for the pods to be created, which will call another reconcile
retryLaterDuration = 0
} else {
// Retry after five seconds to check if the replica management commands have been completed
retryLaterDuration = time.Second * 5
}
}
return
}
// cleanupManagedCloudScaleDown does the logic of cleaning-up an incomplete scale down operation.
// This will remove any bad readinessConditions that the scaleDown might have set when trying to scaleDown pods.
func cleanupManagedCloudScaleDown(ctx context.Context, r *SolrCloudReconciler, podList []corev1.Pod, logger logr.Logger) (err error) {
// First though, the scaleDown op might have set some pods to be "unready" before deletion. Undo that.
// Before doing anything to the pod, make sure that the pods do not have a stopped readiness condition
readinessConditions := map[corev1.PodConditionType]podReadinessConditionChange{
util.SolrIsNotStoppedReadinessCondition: {
reason: PodStarted,
message: "Pod is not being deleted, traffic to the pod must be restarted",
status: true,
},
}
for _, pod := range podList {
if updatedPod, e := EnsurePodReadinessConditions(ctx, r, &pod, readinessConditions, logger); e != nil {
err = e
return
} else {
pod = *updatedPod
}
}
return
}
// handleManagedCloudScaleUp does the logic of a managed and "locked" cloud scale up operation.
// This will likely take many reconcile loops to complete, as it is moving replicas to the pods that have recently been scaled up.
func handleManagedCloudScaleUp(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, clusterOp *SolrClusterOp, podList []corev1.Pod, logger logr.Logger) (operationComplete bool, nextClusterOperation *SolrClusterOp, err error) {
desiredPods := 0
desiredPods, err = strconv.Atoi(clusterOp.Metadata)
if err != nil {
logger.Error(err, "Could not convert ScaleUp metadata to int, as it represents the number of nodes to scale to", "metadata", clusterOp.Metadata)
return
}
configuredPods := int(*statefulSet.Spec.Replicas)
if configuredPods < desiredPods {
// The first thing to do is increase the number of pods the statefulSet is running
originalStatefulSet := statefulSet.DeepCopy()
statefulSet.Spec.Replicas = pointer.Int32(int32(desiredPods))
err = r.Patch(ctx, statefulSet, client.StrategicMergeFrom(originalStatefulSet))
if err != nil {
logger.Error(err, "Error while patching StatefulSet to increase the number of pods for the ScaleUp")
}
} else if len(podList) >= configuredPods {
nextClusterOperation = &SolrClusterOp{
Operation: BalanceReplicasLock,
Metadata: "ScaleUp",
}
operationComplete = true
}
return
}
// hasAnyEphemeralData returns true if any of the given pods uses ephemeral Data for Solr storage, and false if all pods use persistent storage.
func hasAnyEphemeralData(solrPods []corev1.Pod) bool {
for _, pod := range solrPods {
for _, cond := range pod.Status.Conditions {
if cond.Type == util.SolrReplicasNotEvictedReadinessCondition {
return true
}
}
}
return false
}
func determineRollingUpdateClusterOpLockIfNecessary(instance *solrv1beta1.SolrCloud, outOfDatePods util.OutOfDatePodSegmentation) (clusterOp *SolrClusterOp, retryLaterDuration time.Duration, err error) {
if instance.Spec.UpdateStrategy.Method == solrv1beta1.ManagedUpdate && !outOfDatePods.IsEmpty() {
includesDataMigration := hasAnyEphemeralData(outOfDatePods.Running) || hasAnyEphemeralData(outOfDatePods.ScheduledForDeletion)
metadata := RollingUpdateMetadata{
RequiresReplicaMigration: includesDataMigration,
}
metaBytes, err := json.Marshal(metadata)
if err != nil {
return nil, 0, err
}
clusterOp = &SolrClusterOp{
Operation: UpdateLock,
Metadata: string(metaBytes),
}
}
return
}
// handleManagedCloudRollingUpdate does the logic of a managed and "locked" cloud rolling update operation.
// This will take many reconcile loops to complete, as it is deleting pods/moving replicas.
func handleManagedCloudRollingUpdate(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, clusterOp *SolrClusterOp, outOfDatePods util.OutOfDatePodSegmentation, hasReadyPod bool, availableUpdatedPodCount int, logger logr.Logger) (operationComplete bool, requestInProgress bool, retryLaterDuration time.Duration, nextClusterOp *SolrClusterOp, err error) {
// Manage the updating of out-of-spec pods, if the Managed UpdateStrategy has been specified.
updateLogger := logger.WithName("ManagedUpdateSelector")
// First check if all pods are up to date and ready. If so the rolling update is complete
configuredPods := int(*statefulSet.Spec.Replicas)
if configuredPods == availableUpdatedPodCount {
updateMetadata := &RollingUpdateMetadata{}
if clusterOp.Metadata != "" {
if err = json.Unmarshal([]byte(clusterOp.Metadata), &updateMetadata); err != nil {
updateLogger.Error(err, "Could not unmarshal metadata for rolling update operation")
}
}
operationComplete = true
// Only do a re-balancing for rolling restarts that migrated replicas
// If a scale-up will occur afterwards, skip the re-balancing, because it will occur after the scale-up anyway
if updateMetadata.RequiresReplicaMigration && *instance.Spec.Replicas <= *statefulSet.Spec.Replicas {
nextClusterOp = &SolrClusterOp{
Operation: BalanceReplicasLock,
Metadata: "RollingUpdateComplete",
}
}
return
} else if outOfDatePods.IsEmpty() {
// Just return and wait for the updated pods to come up healthy, these will call new reconciles, so there is nothing for us to do
return
} else {
// The out of date pods that have not been started, should all be updated immediately.
// There is no use "safely" updating pods which have not been started yet.
podsToUpdate := append([]corev1.Pod{}, outOfDatePods.NotStarted...)
for _, pod := range outOfDatePods.NotStarted {
updateLogger.Info("Pod killed for update.", "pod", pod.Name, "reason", "The solr container in the pod has not yet started, thus it is safe to update.")
}
// Don't exit on an error, which would only occur because of an HTTP Exception. Requeue later instead.
// We won't kill pods that we need the cluster state for, but we can kill the pods that are already not running.
// This is important for scenarios where there is a bad pod config and nothing is running, but we need to do
// a restart to get a working pod config.
state, retryLater, apiError := util.GetNodeReplicaState(ctx, instance, statefulSet, hasReadyPod, logger)
if apiError != nil {
return false, true, 0, nil, apiError
} else if !retryLater {
// If the cluster status has been successfully fetched, then add the pods scheduled for deletion
// This requires the clusterState to be fetched successfully to ensure that we know if there
// are replicas living on the pod
podsToUpdate = append(podsToUpdate, outOfDatePods.ScheduledForDeletion...)
// Pick which pods should be deleted for an update.
var additionalPodsToUpdate []corev1.Pod
additionalPodsToUpdate, retryLater =
util.DeterminePodsSafeToUpdate(instance, int(*statefulSet.Spec.Replicas), outOfDatePods, state, availableUpdatedPodCount, updateLogger)
// If we do not have the clusterState, it's not safe to update pods that are running
if !retryLater {
podsToUpdate = append(podsToUpdate, additionalPodsToUpdate...)
}
}
// Only actually delete a running pod if it has been evicted, or doesn't need eviction (persistent storage)
for _, pod := range podsToUpdate {
retryLaterDurationTemp, inProgTmp, errTemp := DeletePodForUpdate(ctx, r, instance, &pod, state.PodHasReplicas(instance, pod.Name), updateLogger)
requestInProgress = requestInProgress || inProgTmp
// Use the retryLaterDuration of the pod that requires a retry the soonest (smallest duration > 0)
if retryLaterDurationTemp > 0 && (retryLaterDurationTemp < retryLaterDuration || retryLaterDuration == 0) {
retryLaterDuration = retryLaterDurationTemp
}
if errTemp != nil {
err = errTemp
}
}
if retryLater && retryLaterDuration == 0 {
retryLaterDuration = time.Second * 10
}
}
return
}
// cleanupManagedCloudRollingUpdate does the logic of cleaning-up an incomplete rolling update operation.
// This will remove any bad readinessConditions that the rollingUpdate might have set when trying to restart pods.
func cleanupManagedCloudRollingUpdate(ctx context.Context, r *SolrCloudReconciler, podList []corev1.Pod, logger logr.Logger) (err error) {
// First though, the scaleDown op might have set some pods to be "unready" before deletion. Undo that.
// Before doing anything to the pod, make sure that the pods do not have a stopped readiness condition
er := EvictingReplicas
readinessConditions := map[corev1.PodConditionType]podReadinessConditionChange{
util.SolrIsNotStoppedReadinessCondition: {
reason: PodStarted,
message: "Pod is not being deleted, traffic to the pod must be restarted",
status: true,
},
util.SolrReplicasNotEvictedReadinessCondition: {
// Only set this condition if the condition hasn't been changed since pod start
// We do not want to over-write future states later down the eviction pipeline
matchPreviousReason: &er,
reason: PodStarted,
message: "Pod is not being deleted, ephemeral data is no longer being evicted",
status: true,
},
}
for _, pod := range podList {
if updatedPod, e := EnsurePodReadinessConditions(ctx, r, &pod, readinessConditions, logger); e != nil {
err = e
return
} else {
pod = *updatedPod
}
}
return
}
// clearClusterOpLockWithPatch simply removes any clusterOp for the given statefulSet.
func clearClusterOpLockWithPatch(ctx context.Context, r *SolrCloudReconciler, statefulSet *appsv1.StatefulSet, reason string, logger logr.Logger) (err error) {
originalStatefulSet := statefulSet.DeepCopy()
clearClusterOpLock(statefulSet)
if err = r.Patch(ctx, statefulSet, client.StrategicMergeFrom(originalStatefulSet)); err != nil {
logger.Error(err, "Error while patching StatefulSet to remove unneeded clusterOpLock annotation", "reason", reason)
} else {
logger.Info("Removed unneeded clusterOpLock annotation from statefulSet", "reason", reason)
}
return
}
// clearClusterOpLockWithPatch simply removes any clusterOp for the given statefulSet.
func setNextClusterOpLockWithPatch(ctx context.Context, r *SolrCloudReconciler, statefulSet *appsv1.StatefulSet, nextClusterOp *SolrClusterOp, reason string, logger logr.Logger) (err error) {
originalStatefulSet := statefulSet.DeepCopy()
clearClusterOpLock(statefulSet)
if err = setClusterOpLock(statefulSet, *nextClusterOp); err != nil {
logger.Error(err, "Error while patching StatefulSet to set next clusterOpLock annotation after finishing previous clusterOp", "reason", reason)
}
if err = r.Patch(ctx, statefulSet, client.StrategicMergeFrom(originalStatefulSet)); err != nil {
logger.Error(err, "Error while patching StatefulSet to set next clusterOpLock annotation after finishing previous clusterOp", "reason", reason)
} else {
logger.Info("Set next clusterOpLock annotation on statefulSet after finishing previous clusterOp", "reason", reason)
}
return
}
// enqueueCurrentClusterOpForRetryWithPatch adds the current clusterOp to the clusterOpRetryQueue, and clears the current cluster Op.
// This method will send the StatefulSet patch to the API Server.
func enqueueCurrentClusterOpForRetryWithPatch(ctx context.Context, r *SolrCloudReconciler, statefulSet *appsv1.StatefulSet, reason string, logger logr.Logger) (err error) {
originalStatefulSet := statefulSet.DeepCopy()
hasOp, err := enqueueCurrentClusterOpForRetry(statefulSet)
if hasOp && err == nil {
err = r.Patch(ctx, statefulSet, client.StrategicMergeFrom(originalStatefulSet))
}
if err != nil {
logger.Error(err, "Error while patching StatefulSet to enqueue clusterOp for retry", "reason", reason)
} else if hasOp {
logger.Info("Enqueued current clusterOp to continue later", "reason", reason)
}
return err
}
// retryNextQueuedClusterOpWithPatch removes the first clusterOp from the clusterOpRetryQueue, and sets it as the current cluster Op.
// This method will send the StatefulSet patch to the API Server.
func retryNextQueuedClusterOpWithPatch(ctx context.Context, r *SolrCloudReconciler, statefulSet *appsv1.StatefulSet, clusterOpQueue []SolrClusterOp, logger logr.Logger) (err error) {
originalStatefulSet := statefulSet.DeepCopy()
hasOp, err := retryNextQueuedClusterOpWithQueue(statefulSet, clusterOpQueue)
if hasOp && err == nil {
err = r.Patch(ctx, statefulSet, client.StrategicMergeFrom(originalStatefulSet))
}
if err != nil {
logger.Error(err, "Error while patching StatefulSet to retry next queued clusterOp")
} else if hasOp {
logger.Info("Retrying next queued clusterOp")
}
return err
}
// scaleCloudUnmanaged does simple scaling of a SolrCloud without moving replicas.
// This is not a "locked" cluster operation, and does not block other cluster operations from taking place.
func scaleCloudUnmanaged(ctx context.Context, r *SolrCloudReconciler, statefulSet *appsv1.StatefulSet, scaleTo int, logger logr.Logger) (err error) {
// Before doing anything to the pod, make sure that users cannot send requests to the pod anymore.
patchedStatefulSet := statefulSet.DeepCopy()
patchedStatefulSet.Spec.Replicas = pointer.Int32(int32(scaleTo))
if err = r.Patch(ctx, patchedStatefulSet, client.StrategicMergeFrom(statefulSet)); err != nil {
logger.Error(err, "Error while patching StatefulSet to scale SolrCloud.", "fromNodes", *statefulSet.Spec.Replicas, "toNodes", scaleTo)
}
return err
}
// This is currently not used, use in the future if we want to delete all data when scaling down to zero
func evictAllPods(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, podList []corev1.Pod, readinessConditions map[corev1.PodConditionType]podReadinessConditionChange, logger logr.Logger) (podsAreEmpty bool, err error) {
// If there are no pods, we can't empty them. Just return true
if len(podList) == 0 {
return true, nil
}
for i, pod := range podList {
if updatedPod, e := EnsurePodReadinessConditions(ctx, r, &pod, readinessConditions, logger); e != nil {
err = e
return
} else {
podList[i] = *updatedPod
}
}
// Delete all collections & data, the user wants no data left if scaling the solrcloud down to 0
// This is a much different operation to deleting the SolrCloud/StatefulSet all-together
// TODO: Implement delete all collections. Currently just leave the data
//if err, podsAreEmpty = util.DeleteAllCollectionsIfNecessary(ctx, instance, "scaleDown", logger); err != nil {
// logger.Error(err, "Error while evicting all collections in SolrCloud, when scaling down SolrCloud to 0 pods")
//}
podsAreEmpty = true
return
}
func evictSinglePod(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, scaleDownTo int, podList []corev1.Pod, readinessConditions map[corev1.PodConditionType]podReadinessConditionChange, logger logr.Logger) (podIsEmpty bool, requestInProgress bool, err error) {
var pod *corev1.Pod
podName := instance.GetSolrPodName(scaleDownTo)
for _, p := range podList {
if p.Name == podName {
pod = &p
break
}
}
podHasReplicas := true
if replicas, e := getReplicasForPod(ctx, instance, podName, logger); e != nil {
return false, false, e
} else {
podHasReplicas = len(replicas) > 0
}
// The pod doesn't exist, we cannot empty it
if pod == nil {
return !podHasReplicas, false, errors.New("Could not find pod " + podName + " when trying to migrate replicas to scale down pod.")
}
if updatedPod, e := EnsurePodReadinessConditions(ctx, r, pod, readinessConditions, logger); e != nil {
err = e
return
} else {
pod = updatedPod
}
// Only evict from the pod if it contains replicas in the clusterState
var canDeletePod bool
if err, canDeletePod, requestInProgress = util.EvictReplicasForPodIfNecessary(ctx, instance, pod, podHasReplicas, "scaleDown", logger); err != nil {
logger.Error(err, "Error while evicting replicas on Pod, when scaling down SolrCloud", "pod", pod.Name)
} else if canDeletePod {
// The pod previously had replicas, so loop back in the next reconcile to make sure that the pod doesn't
// have replicas anymore even if the previous evict command was successful.
// If there are still replicas, it will start the eviction process again
podIsEmpty = !podHasReplicas
}
return
}
func getReplicasForPod(ctx context.Context, cloud *solrv1beta1.SolrCloud, podName string, logger logr.Logger) (replicas []string, err error) {
clusterResp := &solr_api.SolrClusterStatusResponse{}
queryParams := url.Values{}
queryParams.Add("action", "CLUSTERSTATUS")
err = solr_api.CallCollectionsApi(ctx, cloud, queryParams, clusterResp)
if _, apiError := solr_api.CheckForCollectionsApiError("CLUSTERSTATUS", clusterResp.ResponseHeader, clusterResp.Error); apiError != nil {
err = apiError
}
podNodeName := util.SolrNodeName(cloud, podName)
if err == nil {
for _, colState := range clusterResp.ClusterStatus.Collections {
for _, shardState := range colState.Shards {
for replica, replicaState := range shardState.Replicas {
if replicaState.NodeName == podNodeName {
replicas = append(replicas, replica)
}
}
}
}
} else {
logger.Error(err, "Error retrieving cluster status, cannot determine if pod has replicas")
}
return
}