Skip to content

Commit

Permalink
Fix Failover Confusion in DRPC Action Post Hub Recovery
Browse files Browse the repository at this point in the history
If the DRPC Action is set to Failover, for instance, transitioning from C1 to C2, a subsequent
failover request from C2 to C1 creates confusion in Ramen post hub recovery because the action
didn't change and the only thing that changed is the destination cluster. The solution is to
permit failover from C2 to C1 if C1 is accessible.

Signed-off-by: Benamar Mekhissi <[email protected]>
  • Loading branch information
Benamar Mekhissi committed Jan 23, 2024
1 parent e544937 commit a25db1d
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 68 deletions.
4 changes: 4 additions & 0 deletions api/v1alpha1/drplacementcontrol_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ type DRState string

// These are the valid values for DRState
const (
// WaitForUser, state recorded in DRPC status to indicate that we are
// waiting for the user to take an action after hub recover.
WaitForUser = DRState("WaitForUser")

// Initiating, state recorded in the DRPC status to indicate that this
// action (Deploy/Failover/Relocate) is preparing for execution. There
// is NO follow up state called 'Initiated'
Expand Down
1 change: 1 addition & 0 deletions controllers/drplacementcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -2335,6 +2335,7 @@ func (d *DRPCInstance) setConditionOnInitialDeploymentCompletion() {

func (d *DRPCInstance) setStatusInitiating() {
if !(d.instance.Status.Phase == "" ||
d.instance.Status.Phase == rmn.WaitForUser ||

Check failure on line 2338 in controllers/drplacementcontrol.go

View workflow job for this annotation

GitHub Actions / Build image

undefined: rmn.WaitForUser

Check failure on line 2338 in controllers/drplacementcontrol.go

View workflow job for this annotation

GitHub Actions / Unit tests

undefined: rmn.WaitForUser
d.instance.Status.Phase == rmn.Deployed ||
d.instance.Status.Phase == rmn.FailedOver ||
d.instance.Status.Phase == rmn.Relocated) {
Expand Down
103 changes: 61 additions & 42 deletions controllers/drplacementcontrol_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,8 @@ func (r *DRPlacementControlReconciler) createDRPCInstance(
placementObj client.Object,
log logr.Logger,
) (*DRPCInstance, error) {
log.Info("Creating DRPC instance")

drClusters, err := getDRClusters(ctx, r.Client, drPolicy)
if err != nil {
return nil, err
Expand Down Expand Up @@ -2168,8 +2170,10 @@ func (r *DRPlacementControlReconciler) ensureDRPCStatusConsistency(
) (bool, error) {
requeue := true

log.Info("Ensure DRPC Status Consistency")

// This will always be false the first time the DRPC resource is first created OR after hub recovery
if drpc.Status.Phase != "" {
if drpc.Status.Phase != "" && drpc.Status.Phase != rmn.WaitForUser {

Check failure on line 2176 in controllers/drplacementcontrol_controller.go

View workflow job for this annotation

GitHub Actions / Build image

undefined: rmn.WaitForUser

Check failure on line 2176 in controllers/drplacementcontrol_controller.go

View workflow job for this annotation

GitHub Actions / Unit tests

undefined: rmn.WaitForUser
return !requeue, nil
}

Expand All @@ -2178,7 +2182,10 @@ func (r *DRPlacementControlReconciler) ensureDRPCStatusConsistency(
dstCluster = drpc.Spec.FailoverCluster
}

progress, err := r.determineDRPCState(ctx, drpc, drPolicy, placementObj, dstCluster, log)
progress, msg, err := r.determineDRPCState(ctx, drpc, drPolicy, placementObj, dstCluster, log)

log.Info(msg)

if err != nil {
return requeue, err
}
Expand All @@ -2187,22 +2194,23 @@ func (r *DRPlacementControlReconciler) ensureDRPCStatusConsistency(
case Continue:
return !requeue, nil
case AllowFailover:
drpc.Status.Phase = rmn.WaitForUser

Check failure on line 2197 in controllers/drplacementcontrol_controller.go

View workflow job for this annotation

GitHub Actions / Build image

undefined: rmn.WaitForUser

Check failure on line 2197 in controllers/drplacementcontrol_controller.go

View workflow job for this annotation

GitHub Actions / Unit tests

undefined: rmn.WaitForUser
updateDRPCProgression(drpc, rmn.ProgressionActionPaused, log)
addOrUpdateCondition(&drpc.Status.Conditions, rmn.ConditionAvailable,
drpc.Generation, metav1.ConditionTrue, rmn.ReasonSuccess, "Failover allowed")
drpc.Generation, metav1.ConditionTrue, rmn.ReasonSuccess, msg)
addOrUpdateCondition(&drpc.Status.Conditions, rmn.ConditionPeerReady, drpc.Generation,
metav1.ConditionTrue, rmn.ReasonSuccess, "Failover allowed")

return requeue, nil
default:
msg := "Operation Paused - User Intervention Required."
msg := fmt.Sprintf("Operation Paused - User Intervention Required. %s", msg)

log.Info(fmt.Sprintf("err:%v - msg:%s", err, msg))
log.Info(msg)
updateDRPCProgression(drpc, rmn.ProgressionActionPaused, log)
addOrUpdateCondition(&drpc.Status.Conditions, rmn.ConditionAvailable,
drpc.Generation, metav1.ConditionFalse, rmn.ReasonPaused, msg)
addOrUpdateCondition(&drpc.Status.Conditions, rmn.ConditionPeerReady, drpc.Generation,
metav1.ConditionFalse, rmn.ReasonPaused, msg)
metav1.ConditionFalse, rmn.ReasonPaused, "User Intervention Required")

return requeue, nil
}
Expand Down Expand Up @@ -2249,41 +2257,49 @@ func (r *DRPlacementControlReconciler) determineDRPCState(
placementObj client.Object,
dstCluster string,
log logr.Logger,
) (Progress, error) {
) (Progress, string, error) {
log.Info("Rebuild DRPC state")

vrgNamespace, err := selectVRGNamespace(r.Client, log, drpc, placementObj)
if err != nil {
log.Info("Failed to select VRG namespace")

return Stop, err
return Stop, "", err
}

drClusters, err := getDRClusters(ctx, r.Client, drPolicy)
if err != nil {
return Stop, err
return Stop, "", err
}

vrgs, successfullyQueriedClusterCount, failedCluster, err := getVRGsFromManagedClusters(
r.MCVGetter, drpc, drClusters, vrgNamespace, log)
if err != nil {
log.Info("Failed to get a list of VRGs")

return Stop, err
return Stop, "", err
}

// IF 2 clusters queried, and both queries failed, then STOP
if successfullyQueriedClusterCount == 0 {
log.Info("Number of clusters queried is 0. Stop...")
msg := "Stop - Number of clusters queried is 0"

return Stop, nil
return Stop, msg, nil
}

// IF 2 clusters queried successfully and no VRGs, then continue with initial deployment
if successfullyQueriedClusterCount == 2 && len(vrgs) == 0 {
log.Info("Queried 2 clusters successfully")

return Continue, nil
return Continue, "", nil
}

if drpc.Status.Phase == rmn.WaitForUser &&

Check failure on line 2297 in controllers/drplacementcontrol_controller.go

View workflow job for this annotation

GitHub Actions / Build image

undefined: rmn.WaitForUser

Check failure on line 2297 in controllers/drplacementcontrol_controller.go

View workflow job for this annotation

GitHub Actions / Unit tests

undefined: rmn.WaitForUser
drpc.Spec.Action == rmn.ActionFailover &&
drpc.Spec.FailoverCluster != failedCluster {
log.Info("Continue. The action is failover and the failoverCluster is accessible")

return Continue, "", nil
}

// IF queried 2 clusters queried, 1 failed and 0 VRG found, then check s3 store.
Expand All @@ -2297,35 +2313,36 @@ func (r *DRPlacementControlReconciler) determineDRPCState(
if vrg == nil {
// IF the failed cluster is not the dest cluster, then this could be an initial deploy
if failedCluster != dstCluster {
return Continue, nil
return Continue, "", nil
}

log.Info("Unable to query all clusters and failed to get VRG from s3 store")
msg := fmt.Sprintf("Unable to query all clusters and failed to get VRG from s3 store. Failed to query %s",
failedCluster)

return Stop, nil
return Stop, msg, nil
}

log.Info("VRG From s3", "VRG Spec", vrg.Spec, "VRG Annotations", vrg.GetAnnotations())
log.Info("Got VRG From s3", "VRG Spec", vrg.Spec, "VRG Annotations", vrg.GetAnnotations())

if drpc.Spec.Action != rmn.DRAction(vrg.Spec.Action) {
log.Info(fmt.Sprintf("Two different actions - drpc action is '%s'/vrg action from s3 is '%s'",
drpc.Spec.Action, vrg.Spec.Action))
msg := fmt.Sprintf("Failover is allowed - Two different actions - drpcAction is '%s' and vrgAction from s3 is '%s'",
drpc.Spec.Action, vrg.Spec.Action)

return AllowFailover, nil
return AllowFailover, msg, nil
}

if dstCluster == vrg.GetAnnotations()[DestinationClusterAnnotationKey] &&
dstCluster != failedCluster {
log.Info(fmt.Sprintf("VRG from s3. Same dstCluster %s/%s. Proceeding...",
dstCluster, vrg.GetAnnotations()[DestinationClusterAnnotationKey]))

return Continue, nil
return Continue, "", nil
}

log.Info(fmt.Sprintf("VRG from s3. DRPCAction/vrgAction/DRPCDstClstr/vrgDstClstr %s/%s/%s/%s. Allow Failover...",
drpc.Spec.Action, vrg.Spec.Action, dstCluster, vrg.GetAnnotations()[DestinationClusterAnnotationKey]))
msg := fmt.Sprintf("Failover is allowed - drpcAction:'%s'. vrgAction:'%s'. DRPCDstClstr:'%s'. vrgDstClstr:'%s'.",
drpc.Spec.Action, vrg.Spec.Action, dstCluster, vrg.GetAnnotations()[DestinationClusterAnnotationKey])

return AllowFailover, nil
return AllowFailover, msg, nil
}

// IF 2 clusters queried, 1 failed and 1 VRG found on the failover cluster, then check the action, if they don't
Expand All @@ -2341,25 +2358,26 @@ func (r *DRPlacementControlReconciler) determineDRPCState(
break
}

if drpc.Spec.Action != rmn.DRAction(vrg.Spec.Action) {
log.Info(fmt.Sprintf("Stop! Two different actions - drpc action is '%s'/vrg action is '%s'",
drpc.Spec.Action, vrg.Spec.Action))
if drpc.Spec.Action != rmn.DRAction(vrg.Spec.Action) &&
dstCluster == clusterName {
msg := fmt.Sprintf("Stop - Two different actions for the same cluster - drpcAction:'%s'. vrgAction:'%s'",
drpc.Spec.Action, vrg.Spec.Action)

return Stop, nil
return Stop, msg, nil
}

if dstCluster != clusterName && vrg.Spec.ReplicationState == rmn.Secondary {
log.Info(fmt.Sprintf("Same Action and dstCluster and ReplicationState %s/%s/%s",
log.Info(fmt.Sprintf("Failover is allowed. Action/dstCluster/ReplicationState %s/%s/%s",
drpc.Spec.Action, dstCluster, vrg.Spec.ReplicationState))

log.Info("Failover is allowed - Primary is assumed in the failed cluster")
msg := "Failover is allowed - Primary is assumed to be on the failed cluster"

return AllowFailover, nil
return AllowFailover, msg, nil
}

log.Info("Allow to continue")
log.Info("Same action, dstCluster, and ReplicationState is primary. Continuing")

return Continue, nil
return Continue, "", nil
}

// Finally, IF 2 clusters queried successfully and 1 or more VRGs found, and if one of the VRGs is on the dstCluster,
Expand All @@ -2379,25 +2397,26 @@ func (r *DRPlacementControlReconciler) determineDRPCState(

// This can happen if a hub is recovered in the middle of a Relocate
if vrg.Spec.ReplicationState == rmn.Secondary && len(vrgs) == 2 {
log.Info("Both VRGs are in secondary state")
msg := "Stop - Both VRGs have the same secondary state"

return Stop, nil
return Stop, msg, nil
}

if drpc.Spec.Action == rmn.DRAction(vrg.Spec.Action) && dstCluster == clusterName {
log.Info(fmt.Sprintf("Same Action %s", drpc.Spec.Action))
log.Info(fmt.Sprintf("Same Action and dest cluster %s/%s", drpc.Spec.Action, dstCluster))

return Continue, nil
return Continue, "", nil
}

log.Info("Failover is allowed", "vrgs count", len(vrgs), "drpc action",
drpc.Spec.Action, "vrg action", vrg.Spec.Action, "dstCluster/clusterName", dstCluster+"/"+clusterName)
msg := fmt.Sprintf("Failover is allowed - VRGs count:'%d'. drpcAction:'%s'."+
" vrgAction:'%s'. DstCluster:'%s'. vrgOnCluste '%s'",
len(vrgs), drpc.Spec.Action, vrg.Spec.Action, dstCluster, clusterName)

return AllowFailover, nil
return AllowFailover, msg, nil
}

// IF none of the above, then allow failover (set PeerReady), but stop until someone makes the change
log.Info("Failover is allowed, but user intervention is required")
msg := "Failover is allowed - User intervention is required"

return AllowFailover, nil
return AllowFailover, msg, nil
}
Loading

0 comments on commit a25db1d

Please sign in to comment.