Skip to content

Commit

Permalink
Merge branch 'master' into mdeng/upstream/namespaced-bfm
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored Dec 1, 2023
2 parents 608c08b + 8ebc21f commit 63f115c
Show file tree
Hide file tree
Showing 3 changed files with 356 additions and 7 deletions.
2 changes: 2 additions & 0 deletions pkg/backup/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ const (

// the annotation for store temporary volumeID
AnnTemporaryVolumeID = "temporary/volume-id"
// AnnRestoredVolumeID for store the volume id restored by volume-restore
AnnRestoredVolumeID = "tikv-volume-restore/volume-id"

// These annotations are taken from the Kubernetes persistent volume/persistent volume claim controller.
// They cannot be directly importing because they are part of the kubernetes/kubernetes package, and importing that package is unsupported.
Expand Down
69 changes: 62 additions & 7 deletions pkg/backup/snapshotter/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,35 +207,85 @@ func commitPVsAndPVCsToK8S(
r *v1alpha1.Restore,
pvcs []*corev1.PersistentVolumeClaim,
pvs []*corev1.PersistentVolume) (string, error) {
sel, err := label.New().Instance(r.Spec.BR.Cluster).TiKV().Namespace(r.Namespace).Selector()
clusterNamespace := r.Spec.BR.ClusterNamespace
if clusterNamespace == "" {
clusterNamespace = r.Namespace
}
pvSel, err := label.New().Instance(r.Spec.BR.Cluster).TiKV().Namespace(clusterNamespace).Selector()
if err != nil {
return "BuildTiKVSelectorFailed", err
return "BuildTiKVPvSelectorFailed", err
}
existingPVs, err := deps.PVLister.List(sel)
existingPVs, err := deps.PVLister.List(pvSel)
if err != nil {
return "ListPVsFailed", err
}
refPVCMap := make(map[string]struct{})

refPVC2PVMap := make(map[string]*corev1.PersistentVolume)
for _, pv := range existingPVs {
if pv.Spec.ClaimRef != nil {
refPVCMap[pv.Spec.ClaimRef.Name] = struct{}{}
namespacedName := buildNamespacedName(pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name)
refPVC2PVMap[namespacedName] = pv
}
}
pvcMap := make(map[string]*corev1.PersistentVolumeClaim)
for _, pvc := range pvcs {
pvcMap[pvc.Name] = pvc
}

// Since we may generate random PV names, to avoid creating duplicate PVs,
// we need to check if the PV already exists before creating it.
for _, pv := range pvs {
if pv.Spec.ClaimRef != nil {
if _, ok := refPVCMap[pv.Spec.ClaimRef.Name]; ok {
continue
namespacedName := buildNamespacedName(pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name)
if existingPV, ok := refPVC2PVMap[namespacedName]; ok {
// check the existing pv with same pvc ref if created by this restore
if existingPV.Annotations[constants.AnnRestoredVolumeID] == pv.Annotations[constants.AnnRestoredVolumeID] {
klog.Infof("Restore %s/%s existing pv %s has same volume id %s, skip creating pv %s",
r.Namespace, r.Name, existingPV.Name, existingPV.Annotations[constants.AnnRestoredVolumeID], pv.Name)

// restored pv has already created, bind corresponding pvc with existing pv
pvc := pvcMap[pv.Spec.ClaimRef.Name]
pvc.Spec.VolumeName = existingPV.Name
klog.Infof("Restore %s/%s bind pvc %s with existing pv %s",
r.Namespace, r.Name, pvc.Name, existingPV.Name)
continue
} else {
klog.Warningf("Restore %s/%s finds existing pv %s has same pvc ref with the pv %s we will create. Maybe there is volume leak, please take a look",
r.Namespace, r.Name, existingPV.Name, pv.Name)
}
}
}
if err := deps.PVControl.CreatePV(r, pv); err != nil {
return "CreatePVFailed", err
}
}

pvcSel, err := label.New().Instance(r.Spec.BR.Cluster).TiKV().Selector()
if err != nil {
return "BuildTiKVPvcSelectorFailed", err
}
existingPVCs, err := deps.PVCLister.PersistentVolumeClaims(clusterNamespace).List(pvcSel)
if err != nil {
return "ListPVCsFailed", err
}
existingPVCMap := make(map[string]*corev1.PersistentVolumeClaim, len(existingPVCs))
for _, pvc := range existingPVCs {
existingPVCMap[pvc.Name] = pvc
}

for _, pvc := range pvcs {
if existingPVC, ok := existingPVCMap[pvc.Name]; ok {
// check if the existing pvc is created by this restore
if existingPVC.Spec.VolumeName == pvc.Spec.VolumeName {
klog.Infof("Restore %s/%s the pvc %s is already existing, skip it", r.Namespace, r.Name, pvc.Name)
continue
} else {
return "ExistingPVCConflict", fmt.Errorf(
"pvc %s/%s already exists, and has different volume. please remove it carefully to continue volume restore process",
existingPVC.Namespace, existingPVC.Name)
}
}

if err := deps.PVCControl.CreatePVC(r, pvc); err != nil {
if apierrors.IsAlreadyExists(err) {
continue
Expand All @@ -247,6 +297,10 @@ func commitPVsAndPVCsToK8S(
return "", nil
}

func buildNamespacedName(namespace, name string) string {
return fmt.Sprintf("%s/%s", namespace, name)
}

type CloudSnapBackup struct {
TiKV *TiKVBackup `json:"tikv"`
PD Component `json:"pd"`
Expand Down Expand Up @@ -497,6 +551,7 @@ func (m *StoresMixture) ProcessCSBPVCsAndPVs(r *v1alpha1.Restore, csb *CloudSnap
// Clear out non-core metadata fields and status
resetMetadataAndStatus(r, backupClusterName, pvc, pv)
m.snapshotter.ResetPvAvailableZone(r, pv)
pv.Annotations[constants.AnnRestoredVolumeID] = restoreVolID

pvs = append(pvs, pv)
pvcs = append(pvcs, pvc)
Expand Down
Loading

0 comments on commit 63f115c

Please sign in to comment.