Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PVC volume expansion #712

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 58 additions & 1 deletion controllers/solr_cluster_ops_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"
"net/url"
Expand All @@ -53,6 +54,7 @@ const (
ScaleUpLock SolrClusterOperationType = "ScalingUp"
UpdateLock SolrClusterOperationType = "RollingUpdate"
BalanceReplicasLock SolrClusterOperationType = "BalanceReplicas"
PvcExpansionLock SolrClusterOperationType = "PVCExpansion"
)

// RollingUpdateMetadata contains metadata for rolling update cluster operations.
Expand Down Expand Up @@ -150,6 +152,60 @@ func retryNextQueuedClusterOpWithQueue(statefulSet *appsv1.StatefulSet, clusterO
return hasOp, err
}

func determinePvcExpansionClusterOpLockIfNecessary(instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet) (clusterOp *SolrClusterOp, retryLaterDuration time.Duration, err error) {
if instance.Spec.StorageOptions.PersistentStorage != nil &&
instance.Spec.StorageOptions.PersistentStorage.PersistentVolumeClaimTemplate.Spec.Resources.Requests.Storage() != nil &&
instance.Spec.StorageOptions.PersistentStorage.PersistentVolumeClaimTemplate.Spec.Resources.Requests.Storage().String() != statefulSet.Annotations[util.StorageMinimumSizeAnnotation] {
// First make sure that the new Storage request is greater than what already is set.
// PVCs cannot be shrunk
newSize := instance.Spec.StorageOptions.PersistentStorage.PersistentVolumeClaimTemplate.Spec.Resources.Requests.Storage()
// If there is no old size to update, the StatefulSet can be just set to use the new PVC size without any issue.
// Only do a cluster operation if we are expanding from an existing size to a new size
if oldSizeStr, hasOldSize := statefulSet.Annotations[util.StorageMinimumSizeAnnotation]; hasOldSize {
if oldSize, e := resource.ParseQuantity(oldSizeStr); e != nil {
err = e
// TODO: add an event
} else {
// Only update to the new size if it is bigger, we cannot shrink PVCs
if newSize.Cmp(oldSize) > 0 {
clusterOp = &SolrClusterOp{
Operation: PvcExpansionLock,
Metadata: newSize.String(),
}
}
// TODO: add an event saying that we cannot shrink PVCs
}
}
}
return
}

// handleManagedCloudScaleUp does the logic of a managed and "locked" cloud scale up operation.
// This will likely take many reconcile loops to complete, as it is moving replicas to the pods that have recently been scaled up.
Comment on lines +183 to +184
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// handleManagedCloudScaleUp does the logic of a managed and "locked" cloud scale up operation.
// This will likely take many reconcile loops to complete, as it is moving replicas to the pods that have recently been scaled up.
// handlePvcExpansion handles the logic of a persistent volume claim expansion operation.

func handlePvcExpansion(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, clusterOp *SolrClusterOp, logger logr.Logger) (operationComplete bool, retryLaterDuration time.Duration, err error) {
var newSize resource.Quantity
newSize, err = resource.ParseQuantity(clusterOp.Metadata)
if err != nil {
logger.Error(err, "Could not convert PvcExpansion metadata to a resource.Quantity, as it represents the new size of PVCs", "metadata", clusterOp.Metadata)
return
}
operationComplete, err = r.expandPVCs(ctx, instance, statefulSet.Spec.Selector.MatchLabels, newSize, logger)
if err == nil && operationComplete {
originalStatefulSet := statefulSet.DeepCopy()
statefulSet.Annotations[util.StorageMinimumSizeAnnotation] = newSize.String()
statefulSet.Spec.Template.Annotations[util.StorageMinimumSizeAnnotation] = newSize.String()
if err = r.Patch(ctx, statefulSet, client.StrategicMergeFrom(originalStatefulSet)); err != nil {
logger.Error(err, "Error while patching StatefulSet to set the new minimum PVC size after PVCs the completion of PVC resizing", "newSize", newSize)
operationComplete = false
}
// Return and wait for the StatefulSet to be updated which will call the reconcile to start the rolling restart
retryLaterDuration = 0
} else if err == nil {
retryLaterDuration = time.Second * 5
}
return
}

func determineScaleClusterOpLockIfNecessary(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, scaleDownOpIsQueued bool, podList []corev1.Pod, blockReconciliationOfStatefulSet bool, logger logr.Logger) (clusterOp *SolrClusterOp, retryLaterDuration time.Duration, err error) {
desiredPods := int(*instance.Spec.Replicas)
configuredPods := int(*statefulSet.Spec.Replicas)
Expand Down Expand Up @@ -291,7 +347,8 @@ func cleanupManagedCloudScaleDown(ctx context.Context, r *SolrCloudReconciler, p
// handleManagedCloudScaleUp does the logic of a managed and "locked" cloud scale up operation.
// This will likely take many reconcile loops to complete, as it is moving replicas to the pods that have recently been scaled up.
func handleManagedCloudScaleUp(ctx context.Context, r *SolrCloudReconciler, instance *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, clusterOp *SolrClusterOp, podList []corev1.Pod, logger logr.Logger) (operationComplete bool, nextClusterOperation *SolrClusterOp, err error) {
desiredPods, err := strconv.Atoi(clusterOp.Metadata)
desiredPods := 0
desiredPods, err = strconv.Atoi(clusterOp.Metadata)
if err != nil {
logger.Error(err, "Could not convert ScaleUp metadata to int, as it represents the number of nodes to scale to", "metadata", clusterOp.Metadata)
return
Expand Down
71 changes: 61 additions & 10 deletions controllers/solrcloud_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"crypto/md5"
"fmt"
policyv1 "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/runtime"
"reflect"
"sort"
Expand Down Expand Up @@ -483,6 +484,8 @@
operationComplete, nextClusterOperation, err = handleManagedCloudScaleUp(ctx, r, instance, statefulSet, clusterOp, podList, logger)
case BalanceReplicasLock:
operationComplete, requestInProgress, retryLaterDuration, err = util.BalanceReplicasForCluster(ctx, instance, statefulSet, clusterOp.Metadata, clusterOp.Metadata, logger)
case PvcExpansionLock:
operationComplete, retryLaterDuration, err = handlePvcExpansion(ctx, r, instance, statefulSet, clusterOp, logger)
default:
operationFound = false
// This shouldn't happen, but we don't want to be stuck if it does.
Expand Down Expand Up @@ -550,6 +553,12 @@
clusterOpQueue[queueIdx] = *clusterOp
clusterOp = nil
}
clusterOp, retryLaterDuration, err = determinePvcExpansionClusterOpLockIfNecessary(instance, statefulSet)
// If the new clusterOperation is an update to a queued clusterOp, just change the operation that is already queued
if queueIdx, opIsQueued := queuedRetryOps[UpdateLock]; clusterOp != nil && opIsQueued {
clusterOpQueue[queueIdx] = *clusterOp
clusterOp = nil
}

// If a non-managed scale needs to take place, this method will update the StatefulSet without starting
// a "locked" cluster operation
Expand Down Expand Up @@ -932,6 +941,46 @@
return nil
}

func (r *SolrCloudReconciler) expandPVCs(ctx context.Context, cloud *solrv1beta1.SolrCloud, pvcLabelSelector map[string]string, newSize resource.Quantity, logger logr.Logger) (expansionComplete bool, err error) {
var pvcList corev1.PersistentVolumeClaimList
pvcList, err = r.getPVCList(ctx, cloud, pvcLabelSelector)
if err != nil {
return
}
expansionCompleteCount := 0
for _, pvcItem := range pvcList.Items {
if pvcExpansionComplete, e := r.expandPVC(ctx, &pvcItem, newSize, logger); e != nil {
err = e
} else if pvcExpansionComplete {
expansionCompleteCount += 1
}
}
// If all PVCs have been expanded, then we are done
expansionComplete = err == nil && expansionCompleteCount == len(pvcList.Items)
return
}

func (r *SolrCloudReconciler) expandPVC(ctx context.Context, pvc *corev1.PersistentVolumeClaim, newSize resource.Quantity, logger logr.Logger) (expansionComplete bool, err error) {
// If the current capacity is >= the new size, then there is nothing to do, expansion is complete
if pvc.Status.Capacity.Storage().Cmp(newSize) >= 0 {
// TODO: Eventually use the pvc.Status.AllocatedResources and pvc.Status.AllocatedResourceStatuses to determine the status of PVC Expansion and react to failures
expansionComplete = true
} else if !pvc.Spec.Resources.Requests.Storage().Equal(newSize) {
// Update the pvc if the capacity request is different.
// The newSize might be smaller than the current size, but this is supported as the last size might have been too
// big for the storage quota, so it was lowered.
// As long as the PVCs current capacity is lower than the new size, we are still good to update the PVC.
originalPvc := pvc.DeepCopy()
pvc.Spec.Resources.Requests[corev1.ResourceStorage] = newSize
if err = r.Patch(ctx, pvc, client.StrategicMergeFrom(originalPvc)); err != nil {
logger.Error(err, "Error while expanding PersistentVolumeClaim size", "persistentVolumeClaim", pvc.Name, "size", newSize)
} else {
logger.Info("Expanded PersistentVolumeClaim size", "persistentVolumeClaim", pvc.Name, "size", newSize)
}
}
return
}

// Logic derived from:
// - https://book.kubebuilder.io/reference/using-finalizers.html
// - https://github.com/pravega/zookeeper-operator/blob/v0.2.9/pkg/controller/zookeepercluster/zookeepercluster_controller.go#L629
Expand Down Expand Up @@ -978,16 +1027,15 @@
return nil
}

func (r *SolrCloudReconciler) getPVCCount(ctx context.Context, cloud *solrv1beta1.SolrCloud, pvcLabelSelector map[string]string) (pvcCount int, err error) {
func (r *SolrCloudReconciler) getPVCCount(ctx context.Context, cloud *solrv1beta1.SolrCloud, pvcLabelSelector map[string]string) (int, error) {
pvcList, err := r.getPVCList(ctx, cloud, pvcLabelSelector)
if err != nil {
return -1, err
}
pvcCount = len(pvcList.Items)
return pvcCount, nil
return len(pvcList.Items), nil
}

func (r *SolrCloudReconciler) cleanupOrphanPVCs(ctx context.Context, cloud *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, pvcLabelSelector map[string]string, logger logr.Logger) (err error) {
func (r *SolrCloudReconciler) cleanupOrphanPVCs(ctx context.Context, cloud *solrv1beta1.SolrCloud, statefulSet *appsv1.StatefulSet, pvcLabelSelector map[string]string, logger logr.Logger) error {
// this check should make sure we do not delete the PVCs before the STS has scaled down
if cloud.Status.ReadyReplicas == cloud.Status.Replicas {
pvcList, err := r.getPVCList(ctx, cloud, pvcLabelSelector)
Expand All @@ -1003,36 +1051,39 @@
// Don't use the Spec replicas here, because we might be rolling down 1-by-1 and the PVCs for
// soon-to-be-deleted pods should not be deleted until the pod is deleted.
if util.IsPVCOrphan(pvcItem.Name, *statefulSet.Spec.Replicas) {
r.deletePVC(ctx, pvcItem, logger)
if e := r.deletePVC(ctx, pvcItem, logger); e != nil {

Check failure on line 1054 in controllers/solrcloud_controller.go

View workflow job for this annotation

GitHub Actions / Build & Check (Lint & Unit Test) (1.22)

r.deletePVC(ctx, pvcItem, logger) (no value) used as value
err = e
}
}
}
}
return err
}
return nil
}

func (r *SolrCloudReconciler) getPVCList(ctx context.Context, cloud *solrv1beta1.SolrCloud, pvcLabelSelector map[string]string) (pvList corev1.PersistentVolumeClaimList, err error) {
func (r *SolrCloudReconciler) getPVCList(ctx context.Context, cloud *solrv1beta1.SolrCloud, pvcLabelSelector map[string]string) (corev1.PersistentVolumeClaimList, error) {
selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
MatchLabels: pvcLabelSelector,
})
pvclistOps := &client.ListOptions{
pvcListOps := &client.ListOptions{
Namespace: cloud.Namespace,
LabelSelector: selector,
}
pvcList := &corev1.PersistentVolumeClaimList{}
err = r.Client.List(ctx, pvcList, pvclistOps)
err = r.Client.List(ctx, pvcList, pvcListOps)
return *pvcList, err
}

func (r *SolrCloudReconciler) cleanUpAllPVCs(ctx context.Context, cloud *solrv1beta1.SolrCloud, pvcLabelSelector map[string]string, logger logr.Logger) (err error) {
func (r *SolrCloudReconciler) cleanUpAllPVCs(ctx context.Context, cloud *solrv1beta1.SolrCloud, pvcLabelSelector map[string]string, logger logr.Logger) error {
pvcList, err := r.getPVCList(ctx, cloud, pvcLabelSelector)
if err != nil {
return err
}
for _, pvcItem := range pvcList.Items {
r.deletePVC(ctx, pvcItem, logger)
}
return nil
return err
}

func (r *SolrCloudReconciler) deletePVC(ctx context.Context, pvcItem corev1.PersistentVolumeClaim, logger logr.Logger) {
Expand Down
24 changes: 24 additions & 0 deletions controllers/util/solr_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ const (
// These are to be saved on a statefulSet update
ClusterOpsLockAnnotation = "solr.apache.org/clusterOpsLock"
ClusterOpsRetryQueueAnnotation = "solr.apache.org/clusterOpsRetryQueue"
StorageMinimumSizeAnnotation = "solr.apache.org/storageMinimumSize"

SolrIsNotStoppedReadinessCondition = "solr.apache.org/isNotStopped"
SolrReplicasNotEvictedReadinessCondition = "solr.apache.org/replicasNotEvicted"
Expand Down Expand Up @@ -200,6 +201,13 @@ func GenerateStatefulSet(solrCloud *solr.SolrCloud, solrCloudStatus *solr.SolrCl
Spec: pvc.Spec,
},
}
if pvc.Spec.Resources.Requests.Storage() != nil {
annotations[StorageMinimumSizeAnnotation] = pvc.Spec.Resources.Requests.Storage().String()
if podAnnotations == nil {
podAnnotations = make(map[string]string, 1)
}
podAnnotations[StorageMinimumSizeAnnotation] = pvc.Spec.Resources.Requests.Storage().String()
}
} else {
ephemeralVolume := corev1.Volume{
Name: solrDataVolumeName,
Expand Down Expand Up @@ -680,6 +688,22 @@ func MaintainPreservedStatefulSetFields(expected, found *appsv1.StatefulSet) {
}
expected.Annotations[ClusterOpsRetryQueueAnnotation] = queue
}
if storage, hasStorage := found.Annotations[StorageMinimumSizeAnnotation]; hasStorage {
if expected.Annotations == nil {
expected.Annotations = make(map[string]string, 1)
}
expected.Annotations[StorageMinimumSizeAnnotation] = storage
}
}
if found.Spec.Template.Annotations != nil {
// Note: the Pod template storage annotation is used to start a rolling restart,
// it should always match the StatefulSet's storage annotation
if storage, hasStorage := found.Spec.Template.Annotations[StorageMinimumSizeAnnotation]; hasStorage {
if expected.Spec.Template.Annotations == nil {
expected.Spec.Template.Annotations = make(map[string]string, 1)
}
expected.Spec.Template.Annotations[StorageMinimumSizeAnnotation] = storage
}
}

// Scaling (i.e. changing) the number of replicas in the SolrCloud statefulSet is handled during the clusterOps
Expand Down
Loading