Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DFBUGS-319: Fix rdspec and protectedpvcs condition #387

Open
wants to merge 6 commits into
base: release-4.17
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 63 additions & 30 deletions internal/controller/drplacementcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,7 @@ func (d *DRPCInstance) RunRelocate() (bool, error) {
addOrUpdateCondition(&d.instance.Status.Conditions, rmn.ConditionAvailable, d.instance.Generation,
d.getConditionStatusForTypeAvailable(), string(d.instance.Status.Phase), errMsg)

return !done, fmt.Errorf(errMsg)
return !done, fmt.Errorf("%s", errMsg)
}

if d.getLastDRState() != rmn.Relocating && !d.validatePeerReady() {
Expand Down Expand Up @@ -935,8 +935,7 @@ func (d *DRPCInstance) ensureCleanupAndVolSyncReplicationSetup(srcCluster string
// in the MW, but the VRGs in the vrgs slice are fetched using MCV.
vrg, ok := d.vrgs[srcCluster]
if !ok || len(vrg.Spec.VolSync.RDSpec) != 0 {
return fmt.Errorf(fmt.Sprintf("Waiting for RDSpec count on cluster %s to go to zero. VRG OK? %v",
srcCluster, ok))
return fmt.Errorf("waiting for RDSpec count on cluster %s to go to zero. VRG OK? %v", srcCluster, ok)
}

err = d.EnsureCleanup(srcCluster)
Expand Down Expand Up @@ -1406,6 +1405,8 @@ func (d *DRPCInstance) moveVRGToSecondaryEverywhere() bool {
}

func (d *DRPCInstance) cleanupSecondaries(skipCluster string) (bool, error) {
d.log.Info("Cleaning up secondaries.")

for _, clusterName := range rmnutil.DRPolicyClusterNames(d.drPolicy) {
if skipCluster == clusterName {
continue
Expand Down Expand Up @@ -1520,18 +1521,17 @@ func (d *DRPCInstance) createVRGManifestWork(homeCluster string, repState rmn.Re
}

// create VRG ManifestWork
d.log.Info("Creating VRG ManifestWork",
d.log.Info("Creating VRG ManifestWork", "ReplicationState", repState,
"Last State:", d.getLastDRState(), "cluster", homeCluster)

vrg := d.generateVRG(homeCluster, repState)
vrg.Spec.VolSync.Disabled = d.volSyncDisabled
vrg := d.newVRG(homeCluster, repState)

annotations := make(map[string]string)

annotations[DRPCNameAnnotation] = d.instance.Name
annotations[DRPCNamespaceAnnotation] = d.instance.Namespace

if err := d.mwu.CreateOrUpdateVRGManifestWork(
if _, err := d.mwu.CreateOrUpdateVRGManifestWork(
d.instance.Name, d.vrgNamespace,
homeCluster, vrg, annotations); err != nil {
d.log.Error(err, "failed to create or update VolumeReplicationGroup manifest")
Expand All @@ -1548,12 +1548,57 @@ func (d *DRPCInstance) ensureVRGManifestWork(homeCluster string) error {
d.log.Info("Ensure VRG ManifestWork",
"Last State:", d.getLastDRState(), "cluster", homeCluster)

cachedVrg := d.vrgs[homeCluster]
if cachedVrg == nil {
return fmt.Errorf("failed to get vrg from cluster %s", homeCluster)
mw, mwErr := d.mwu.FindManifestWorkByType(rmnutil.MWTypeVRG, homeCluster)
if mwErr != nil {
if errors.IsNotFound(mwErr) {
cachedVrg := d.vrgs[homeCluster]
if cachedVrg == nil {
return fmt.Errorf("failed to get vrg from cluster %s", homeCluster)
}

return d.createVRGManifestWork(homeCluster, cachedVrg.Spec.ReplicationState)
}

return fmt.Errorf("ensure VRG ManifestWork failed (%w)", mwErr)
}

vrg, err := rmnutil.ExtractVRGFromManifestWork(mw)
if err != nil {
return fmt.Errorf("error extracting VRG from ManifestWork for cluster %s. Error: %w", homeCluster, err)
}

return d.createVRGManifestWork(homeCluster, cachedVrg.Spec.ReplicationState)
d.updateVRGOptionalFields(vrg, homeCluster)

return d.mwu.UpdateVRGManifestWork(vrg, mw)
}

// updateVRGOptionalFields ensures that the optional fields in the VRG object are up to date.
// This function does not modify the following fields:
// - ObjectMeta.Name
// - ObjectMeta.Namespace
// - Spec.PVCSelector
// - Spec.ReplicationState
// - Spec.PrepareForFinalSync
// - Spec.RunFinalSync
// - Spec.VolSync.RDSpec
//
// These fields are either set during the initial creation of the VRG (e.g., name and namespace)
// or updated as needed, such as the PrepareForFinalSync and RunFinalSync fields.
func (d *DRPCInstance) updateVRGOptionalFields(vrg *rmn.VolumeReplicationGroup, homeCluster string) {
vrg.ObjectMeta.Annotations = map[string]string{
DestinationClusterAnnotationKey: homeCluster,
DoNotDeletePVCAnnotation: d.instance.GetAnnotations()[DoNotDeletePVCAnnotation],
DRPCUIDAnnotation: string(d.instance.UID),
rmnutil.IsCGEnabledAnnotation: d.instance.GetAnnotations()[rmnutil.IsCGEnabledAnnotation],
}

vrg.Spec.ProtectedNamespaces = d.instance.Spec.ProtectedNamespaces
vrg.Spec.S3Profiles = AvailableS3Profiles(d.drClusters)
vrg.Spec.KubeObjectProtection = d.instance.Spec.KubeObjectProtection
vrg.Spec.Async = d.generateVRGSpecAsync()
vrg.Spec.Sync = d.generateVRGSpecSync()
vrg.Spec.VolSync.Disabled = d.volSyncDisabled
d.setVRGAction(vrg)
}

func (d *DRPCInstance) ensurePlacement(homeCluster string) error {
Expand Down Expand Up @@ -1588,31 +1633,20 @@ func (d *DRPCInstance) setVRGAction(vrg *rmn.VolumeReplicationGroup) {
vrg.Spec.Action = action
}

func (d *DRPCInstance) generateVRG(dstCluster string, repState rmn.ReplicationState) rmn.VolumeReplicationGroup {
func (d *DRPCInstance) newVRG(dstCluster string, repState rmn.ReplicationState) rmn.VolumeReplicationGroup {
vrg := rmn.VolumeReplicationGroup{
TypeMeta: metav1.TypeMeta{Kind: "VolumeReplicationGroup", APIVersion: "ramendr.openshift.io/v1alpha1"},
ObjectMeta: metav1.ObjectMeta{
Name: d.instance.Name,
Namespace: d.vrgNamespace,
Annotations: map[string]string{
DestinationClusterAnnotationKey: dstCluster,
DoNotDeletePVCAnnotation: d.instance.GetAnnotations()[DoNotDeletePVCAnnotation],
DRPCUIDAnnotation: string(d.instance.UID),
rmnutil.IsCGEnabledAnnotation: d.instance.GetAnnotations()[rmnutil.IsCGEnabledAnnotation],
},
},
Spec: rmn.VolumeReplicationGroupSpec{
PVCSelector: d.instance.Spec.PVCSelector,
ProtectedNamespaces: d.instance.Spec.ProtectedNamespaces,
ReplicationState: repState,
S3Profiles: AvailableS3Profiles(d.drClusters),
KubeObjectProtection: d.instance.Spec.KubeObjectProtection,
PVCSelector: d.instance.Spec.PVCSelector,
ReplicationState: repState,
},
}

d.setVRGAction(&vrg)
vrg.Spec.Async = d.generateVRGSpecAsync()
vrg.Spec.Sync = d.generateVRGSpecSync()
d.updateVRGOptionalFields(&vrg, dstCluster)

return vrg
}
Expand Down Expand Up @@ -1760,7 +1794,7 @@ func (d *DRPCInstance) EnsureCleanup(clusterToSkip string) error {
}

if !clean {
msg := "cleaning secondaries"
msg := "cleaning up secondaries"
addOrUpdateCondition(&d.instance.Status.Conditions, rmn.ConditionPeerReady, d.instance.Generation,
metav1.ConditionFalse, rmn.ReasonCleaning, msg)

Expand All @@ -1775,8 +1809,7 @@ func (d *DRPCInstance) EnsureCleanup(clusterToSkip string) error {

//nolint:gocognit
func (d *DRPCInstance) cleanupForVolSync(clusterToSkip string) error {
d.log.Info("VolSync needs both VRGs. No need to clean up secondary")
d.log.Info("Ensure secondary on peer")
d.log.Info("VolSync needs both VRGs. Ensure secondary setup on peer")

peersReady := true

Expand All @@ -1793,7 +1826,7 @@ func (d *DRPCInstance) cleanupForVolSync(clusterToSkip string) error {

// Recreate the VRG ManifestWork for the secondary. This typically happens during Hub Recovery.
if errors.IsNotFound(err) {
err := d.createVolSyncDestManifestWork(clusterToSkip)
err := d.ensureVolSyncSetup(clusterToSkip)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions internal/controller/drplacementcontrol_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1700,7 +1700,7 @@ func getVRGsFromManagedClusters(

vrgs[drCluster.Name] = vrg

log.Info("VRG location", "VRG on", drCluster.Name)
log.Info("VRG location", "VRG on", drCluster.Name, "replicationState", vrg.Spec.ReplicationState)
}

// We are done if we successfully queried all drClusters
Expand Down Expand Up @@ -2810,7 +2810,7 @@ func adoptExistingVRGManifestWork(
annotations[DRPCNameAnnotation] = drpc.Name
annotations[DRPCNamespaceAnnotation] = drpc.Namespace

err := mwu.CreateOrUpdateVRGManifestWork(drpc.Name, vrgNamespace, cluster, *vrg, annotations)
_, err := mwu.CreateOrUpdateVRGManifestWork(drpc.Name, vrgNamespace, cluster, *vrg, annotations)
if err != nil {
log.Info("error updating VRG via ManifestWork during adoption", "error", err, "cluster", cluster)
}
Expand Down Expand Up @@ -2847,7 +2847,7 @@ func adoptOrphanVRG(

vrg.Annotations[DRPCUIDAnnotation] = string(drpc.UID)

if err := mwu.CreateOrUpdateVRGManifestWork(
if _, err := mwu.CreateOrUpdateVRGManifestWork(
drpc.Name, vrgNamespace,
cluster, *vrg, annotations); err != nil {
log.Info("error creating VRG via ManifestWork during adoption", "error", err, "cluster", cluster)
Expand Down
Loading