Skip to content

Commit

Permalink
Merge pull request #7504 from allenxu404/pv-patch-in-finalizing-phase
Browse files Browse the repository at this point in the history
Patch newly dynamically provisioned PV with volume info to restore PV's custom setting
  • Loading branch information
blackpiglet authored Mar 20, 2024
2 parents 922653e + 67b5e82 commit 67c06e6
Show file tree
Hide file tree
Showing 10 changed files with 569 additions and 20 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/7504-allenxu404
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Patch newly dynamically provisioned PV with volume info to restore custom setting of PV
6 changes: 6 additions & 0 deletions pkg/builder/persistent_volume_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,9 @@ func (b *PersistentVolumeBuilder) NodeAffinityRequired(req *corev1api.NodeSelect
}
return b
}

// Phase sets the PersistentVolume's phase.
func (b *PersistentVolumeBuilder) Phase(phase corev1api.PersistentVolumePhase) *PersistentVolumeBuilder {
b.object.Status.Phase = phase
return b
}
1 change: 1 addition & 0 deletions pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,7 @@ func (s *server) runControllers(defaultVolumeSnapshotLocations map[string]string
newPluginManager,
backupStoreGetter,
s.metrics,
s.crClient,
).SetupWithManager(s.mgr); err != nil {
s.logger.Fatal(err, "unable to create controller", "controller", controller.RestoreFinalizer)
}
Expand Down
187 changes: 184 additions & 3 deletions pkg/controller/restore_finalizer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,36 @@ package controller

import (
"context"
"fmt"
"regexp"
"sync"
"time"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/clock"

internalVolume "github.com/vmware-tanzu/velero/internal/volume"
velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/persistence"
"github.com/vmware-tanzu/velero/pkg/plugin/clientmgmt"
"github.com/vmware-tanzu/velero/pkg/restore"
kubeutil "github.com/vmware-tanzu/velero/pkg/util/kube"
"github.com/vmware-tanzu/velero/pkg/util/results"
)

const (
PVPatchMaximumDuration = 10 * time.Minute
)

type restoreFinalizerReconciler struct {
client.Client
namespace string
Expand All @@ -44,6 +56,7 @@ type restoreFinalizerReconciler struct {
backupStoreGetter persistence.ObjectBackupStoreGetter
metrics *metrics.ServerMetrics
clock clock.WithTickerAndDelayedExecution
crClient client.Client
}

func NewRestoreFinalizerReconciler(
Expand All @@ -53,6 +66,7 @@ func NewRestoreFinalizerReconciler(
newPluginManager func(logrus.FieldLogger) clientmgmt.Manager,
backupStoreGetter persistence.ObjectBackupStoreGetter,
metrics *metrics.ServerMetrics,
crClient client.Client,
) *restoreFinalizerReconciler {
return &restoreFinalizerReconciler{
Client: client,
Expand All @@ -62,6 +76,7 @@ func NewRestoreFinalizerReconciler(
backupStoreGetter: backupStoreGetter,
metrics: metrics,
clock: &clock.RealClock{},
crClient: crClient,
}
}

Expand Down Expand Up @@ -123,7 +138,27 @@ func (r *restoreFinalizerReconciler) Reconcile(ctx context.Context, req ctrl.Req
return ctrl.Result{}, errors.Wrap(err, "error getting backup store")
}

finalizerCtx := &finalizerContext{log: log}
volumeInfo, err := backupStore.GetBackupVolumeInfos(restore.Spec.BackupName)
if err != nil {
log.WithError(err).Errorf("error getting volumeInfo for backup %s", restore.Spec.BackupName)
return ctrl.Result{}, errors.Wrap(err, "error getting volumeInfo")
}

restoredResourceList, err := backupStore.GetRestoredResourceList(restore.Name)
if err != nil {
log.WithError(err).Error("error getting restoredResourceList")
return ctrl.Result{}, errors.Wrap(err, "error getting restoredResourceList")
}

restoredPVCList := getRestoredPVCFromRestoredResourceList(restoredResourceList)

finalizerCtx := &finalizerContext{
logger: log,
restore: restore,
crClient: r.crClient,
volumeInfo: volumeInfo,
restoredPVCList: restoredPVCList,
}
warnings, errs := finalizerCtx.execute()

warningCnt := len(warnings.Velero) + len(warnings.Cluster)
Expand Down Expand Up @@ -200,14 +235,160 @@ func (r *restoreFinalizerReconciler) finishProcessing(restorePhase velerov1api.R
// finalizerContext includes all the dependencies required by finalization tasks and
// a function execute() to orderly implement task logic.
type finalizerContext struct {
log logrus.FieldLogger
logger logrus.FieldLogger
restore *velerov1api.Restore
crClient client.Client
volumeInfo []*internalVolume.VolumeInfo
restoredPVCList map[string]struct{}
}

func (ctx *finalizerContext) execute() (results.Result, results.Result) { //nolint:unparam //temporarily ignore the lint report: result 0 is always nil (unparam)
warnings, errs := results.Result{}, results.Result{}

// implement finalization tasks
ctx.log.Debug("Starting running execute()")
pdpErrs := ctx.patchDynamicPVWithVolumeInfo()
errs.Merge(&pdpErrs)

return warnings, errs
}

// patchDynamicPV patches newly dynamically provisioned PV using volume info
// in order to restore custom settings that would otherwise be lost during dynamic PV recreation.
func (ctx *finalizerContext) patchDynamicPVWithVolumeInfo() (errs results.Result) {
ctx.logger.Info("patching newly dynamically provisioned PV starts")

var pvWaitGroup sync.WaitGroup
var resultLock sync.Mutex
maxConcurrency := 3
semaphore := make(chan struct{}, maxConcurrency)

for _, volumeItem := range ctx.volumeInfo {
if (volumeItem.BackupMethod == internalVolume.PodVolumeBackup || volumeItem.BackupMethod == internalVolume.CSISnapshot) && volumeItem.PVInfo != nil {
// Determine restored PVC namespace
restoredNamespace := volumeItem.PVCNamespace
if remapped, ok := ctx.restore.Spec.NamespaceMapping[restoredNamespace]; ok {
restoredNamespace = remapped
}

// Check if PVC was restored in previous phase
pvcKey := fmt.Sprintf("%s/%s", restoredNamespace, volumeItem.PVCName)
if _, restored := ctx.restoredPVCList[pvcKey]; !restored {
continue
}

pvWaitGroup.Add(1)
go func(volInfo internalVolume.VolumeInfo, restoredNamespace string) {
defer pvWaitGroup.Done()

semaphore <- struct{}{}

log := ctx.logger.WithField("PVC", volInfo.PVCName).WithField("PVCNamespace", restoredNamespace)
log.Debug("patching dynamic PV is in progress")

err := wait.PollUntilContextTimeout(context.Background(), 10*time.Second, PVPatchMaximumDuration, true, func(context.Context) (bool, error) {
// wait for PVC to be bound
pvc := &v1.PersistentVolumeClaim{}
err := ctx.crClient.Get(context.Background(), client.ObjectKey{Name: volInfo.PVCName, Namespace: restoredNamespace}, pvc)
if apierrors.IsNotFound(err) {
log.Debug("error not finding PVC")
return false, nil
}
if err != nil {
return false, err
}

if pvc.Status.Phase != v1.ClaimBound || pvc.Spec.VolumeName == "" {
log.Debugf("PVC: %s not ready", pvc.Name)
return false, nil
}

// wait for PV to be bound
pvName := pvc.Spec.VolumeName
pv := &v1.PersistentVolume{}
err = ctx.crClient.Get(context.Background(), client.ObjectKey{Name: pvName}, pv)
if apierrors.IsNotFound(err) {
log.Debugf("error not finding PV: %s", pvName)
return false, nil
}
if err != nil {
return false, err
}

if pv.Spec.ClaimRef == nil || pv.Status.Phase != v1.VolumeBound {
log.Debugf("PV: %s not ready", pvName)
return false, nil
}

// validate PV
if pv.Spec.ClaimRef.Name != pvc.Name || pv.Spec.ClaimRef.Namespace != restoredNamespace {
return false, fmt.Errorf("PV was bound by unexpected PVC, unexpected PVC: %s/%s, expected PVC: %s/%s",
pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name, restoredNamespace, pvc.Name)
}

// patch PV's reclaim policy and label using the corresponding data stored in volume info
if needPatch(pv, volInfo.PVInfo) {
updatedPV := pv.DeepCopy()
updatedPV.Labels = volInfo.PVInfo.Labels
updatedPV.Spec.PersistentVolumeReclaimPolicy = v1.PersistentVolumeReclaimPolicy(volInfo.PVInfo.ReclaimPolicy)
if err := kubeutil.PatchResource(pv, updatedPV, ctx.crClient); err != nil {
return false, err
}
log.Infof("newly dynamically provisioned PV:%s has been patched using volume info", pvName)
}

return true, nil
})

if err != nil {
err = fmt.Errorf("fail to patch dynamic PV, err: %s, PVC: %s, PV: %s", err, volInfo.PVCName, volInfo.PVName)
ctx.logger.WithError(errors.WithStack((err))).Error("err patching dynamic PV using volume info")
resultLock.Lock()
defer resultLock.Unlock()
errs.Add(restoredNamespace, err)
}

<-semaphore
}(*volumeItem, restoredNamespace)
}
}

pvWaitGroup.Wait()
ctx.logger.Info("patching newly dynamically provisioned PV ends")

return errs
}

func getRestoredPVCFromRestoredResourceList(restoredResourceList map[string][]string) map[string]struct{} {
pvcKey := "v1/PersistentVolumeClaim"
pvcList := make(map[string]struct{})

for _, pvc := range restoredResourceList[pvcKey] {
// the format of pvc string in restoredResourceList is like: "namespace/pvcName(status)"
// extract the substring before "(created)" if the status in rightmost Parenthesis is "created"
r := regexp.MustCompile(`\(([^)]+)\)`)
matches := r.FindAllStringSubmatch(pvc, -1)
if len(matches) > 0 && matches[len(matches)-1][1] == restore.ItemRestoreResultCreated {
pvcList[pvc[:len(pvc)-len("(created)")]] = struct{}{}
}
}

return pvcList
}

func needPatch(newPV *v1.PersistentVolume, pvInfo *internalVolume.PVInfo) bool {
if newPV.Spec.PersistentVolumeReclaimPolicy != v1.PersistentVolumeReclaimPolicy(pvInfo.ReclaimPolicy) {
return true
}

newPVLabels, pvLabels := newPV.Labels, pvInfo.Labels
for k, v := range pvLabels {
if _, ok := newPVLabels[k]; !ok {
return true
}
if newPVLabels[k] != v {
return true
}
}

return false
}
Loading

0 comments on commit 67c06e6

Please sign in to comment.