Skip to content

Commit

Permalink
dp: cherry pick dp prs to release-1.4 (#4929)
Browse files Browse the repository at this point in the history
* feat: support tiflash backup and restore during volume snapshot (#4812)

* feat: calc the backup size from snapshot storage usage (#4819)

* fix backup failed when pod was auto restarted by k8s (#4883)

* init code for test

* just clean before backup data

* delete test code

* import pingcap/errors

* add check version

* remove test code

* add running status check

* add restart condition to clarify logic

* fix status update

* fix ut

* br: ensure pvc names sequential for ebs restore (#4888)

* BR: Restart backup when backup job/pod unexpected failed by k8s (#4895)

* init code for test

* just clean before backup data

* delete test code

* import pingcap/errors

* add check version

* remove test code

* add running status check

* add restart condition to clarify logic

* fix status update

* fix ut

* init code

* update crd reference

* fix miss update retry count

* add retry limit as constant

* init runnable code

* refine main controller logic

* add some note

* address some comments

* init e2e test code

* add e2e env to extend backup time

* add e2e env for test

* fix complie

* just test kill pod

* refine logic

* use pkill to kill pod

* fix reconcile

* add kill pod log

* add more log

* add more log

* try kill pod only

* wait and kill running backup pod

* add wait for pod failed

* fix wait pod running

* use killall backup to kill pod

* use pkill -9 backup

* kill pod until pod is failed

* add ps to debug

* connect commands by semicolon

* kill pod by signal 15

* use panic simulate kill pod

* test all kill pod test

* remove useless log

* add original reason of job or pod failure

* rename BackupRetryFailed to BackupRetryTheFailed

* BR: Auto truncate log backup in backup schedule (#4904)

* init schedule log backup code

* add run log backup code

* update api

* refine some nodes

* refine cacluate logic

* add ut

* fix make check

* add log backup test

* refine code

* fix notes

* refine function names

* fix conflict

* fix: add a new check for encryption during the volume snapshot restore (#4914)

* br: volume-snapshot may lead to a panic when there is no block change between two snapshot (#4922)

* br: refine BackoffRetryPolicy time format (#4925)

* refine BackoffRetryPolicy time format

* fix some ut

---------

Co-authored-by: fengou1 <[email protected]>
Co-authored-by: WangLe1321 <[email protected]>
  • Loading branch information
3 people committed Mar 11, 2023
1 parent dcc38f3 commit 7f4e089
Show file tree
Hide file tree
Showing 43 changed files with 8,752 additions and 205 deletions.
34 changes: 21 additions & 13 deletions cmd/backup-manager/app/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"strings"
"time"

"github.com/dustin/go-humanize"
backupUtil "github.com/pingcap/tidb-operator/cmd/backup-manager/app/util"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/backup/constants"
Expand Down Expand Up @@ -71,7 +70,7 @@ func (bo *Options) backupData(
)
localCSBFile := path.Join(util.BRBinPath, "csb_backup.json")
// read cluster meta from external storage and pass it to BR
klog.Infof("read the restore meta from external storage")
klog.Infof("read the cluster meta from external storage")
externalStorage, err := pkgutil.NewStorageBackend(backup.Spec.StorageProvider, &pkgutil.StorageCredential{})
if err != nil {
return err
Expand All @@ -92,23 +91,13 @@ func (bo *Options) backupData(
logCallback = func(line string) {
if strings.Contains(line, successTag) {
extract := strings.Split(line, successTag)[1]
sizeStr := regexp.MustCompile(`size=(\d+)`).FindString(extract)
size := strings.ReplaceAll(sizeStr, "size=", "")
tsStr := regexp.MustCompile(`resolved_ts=(\d+)`).FindString(extract)
ts := strings.ReplaceAll(tsStr, "resolved_ts=", "")
klog.Infof("%s size: %s, resolved_ts: %s", successTag, size, ts)
klog.Infof("%s resolved_ts: %s", successTag, ts)

backupSize, err := strconv.ParseInt(size, 10, 64)
if err != nil {
klog.Warningf("Failed to parse BackupSize %s, %v", size, err)
}
backupSize = backupSize << 30 // Convert GiB to bytes.
backupSizeReadable := humanize.Bytes(uint64(backupSize))
progress := 100.0
if err := statusUpdater.Update(backup, nil, &controller.BackupUpdateStatus{
CommitTs: &ts,
BackupSize: &backupSize,
BackupSizeReadable: &backupSizeReadable,
ProgressStep: &progressStep,
Progress: &progress,
ProgressUpdateTime: &metav1.Time{Time: time.Now()},
Expand Down Expand Up @@ -285,6 +274,8 @@ func (bo *Options) brCommandRunWithLogCallback(ctx context.Context, fullArgs []s
return fmt.Errorf("cluster %s, wait pipe message failed, errMsg %s, err: %v", bo, errMsg, err)
}

e2eTestSimulate(bo)

klog.Infof("Run br commond %v for cluster %s successfully", fullArgs, bo)
return nil
}
Expand Down Expand Up @@ -331,3 +322,20 @@ func (bo *Options) updateProgressFromFile(
}
}
}

// TODO use https://github.com/pingcap/failpoint instead e2e test env
func e2eTestSimulate(bo *Options) {
if backupUtil.IsE2EExtendBackupTime() {
for i := 0; i < 5*60; i++ {
klog.Infof("simulate br running for backup %s", bo)
time.Sleep(time.Second * 1)
}
}
if backupUtil.IsE2EExtendBackupTimeAndPanic() {
for i := 0; i < 2*60; i++ {
klog.Infof("simulate br running for backup %s", bo)
time.Sleep(time.Second * 1)
}
panic("simulate backup pod unexpected termination for e2e test")
}
}
63 changes: 59 additions & 4 deletions cmd/backup-manager/app/backup/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ import (
"strconv"
"time"

"github.com/Masterminds/semver"
"github.com/dustin/go-humanize"
"github.com/pingcap/errors"
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/clean"
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/constants"
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/util"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
Expand Down Expand Up @@ -92,6 +95,20 @@ func (bm *Manager) ProcessBackup() error {
return errorutils.NewAggregate(errs)
}

// we treat snapshot backup as restarted if its status is not scheduled when backup pod just start to run
// we will clean backup data before run br command
if backup.Spec.Mode == v1alpha1.BackupModeSnapshot && (backup.Status.Phase != v1alpha1.BackupScheduled || v1alpha1.IsBackupRestart(backup)) {
klog.Infof("snapshot backup %s was restarted, status is %s", bm, backup.Status.Phase)
uerr := bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupRestart,
Status: corev1.ConditionTrue,
}, nil)
if uerr != nil {
errs = append(errs, uerr)
return errorutils.NewAggregate(errs)
}
}

if backup.Spec.BR == nil {
return fmt.Errorf("no br config in %s", bm)
}
Expand Down Expand Up @@ -269,6 +286,15 @@ func (bm *Manager) performBackup(ctx context.Context, backup *v1alpha1.Backup, d
}
}

// clean snapshot backup data if it was restarted
if backup.Spec.Mode == v1alpha1.BackupModeSnapshot && v1alpha1.IsBackupRestart(backup) && !bm.isBRCanContinueRunByCheckpoint() {
klog.Infof("clean snapshot backup %s data before run br command, backup path is %s", bm, backup.Status.BackupPath)
err := bm.cleanSnapshotBackupEnv(ctx, backup)
if err != nil {
return errors.Annotatef(err, "clean snapshot backup %s failed", bm)
}
}

// change Prepare to Running before real backup process start
if err := bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupRunning,
Expand Down Expand Up @@ -321,11 +347,21 @@ func (bm *Manager) performBackup(ctx context.Context, backup *v1alpha1.Backup, d
var updateStatus *controller.BackupUpdateStatus
switch bm.Mode {
case string(v1alpha1.BackupModeVolumeSnapshot):
// In volume snapshot mode, commitTS and size have been updated according to the
// br command output, so we don't need to update them here.
// In volume snapshot mode, commitTS have been updated according to the
// br command output, so we don't need to update it here.
backupSize, err := util.CalcVolSnapBackupSize(ctx, backup.Spec.StorageProvider)

if err != nil {
klog.Warningf("Failed to calc volume snapshot backup size %d bytes, %v", backupSize, err)
}

backupSizeReadable := humanize.Bytes(uint64(backupSize))

updateStatus = &controller.BackupUpdateStatus{
TimeStarted: &metav1.Time{Time: started},
TimeCompleted: &metav1.Time{Time: time.Now()},
TimeStarted: &metav1.Time{Time: started},
TimeCompleted: &metav1.Time{Time: time.Now()},
BackupSize: &backupSize,
BackupSizeReadable: &backupSizeReadable,
}
default:
backupMeta, err := util.GetBRMetaData(ctx, backup.Spec.StorageProvider)
Expand Down Expand Up @@ -527,3 +563,22 @@ func (bm *Manager) truncateLogBackup(ctx context.Context, backup *v1alpha1.Backu
}
return updateStatus, "", nil
}

func (bm *Manager) cleanSnapshotBackupEnv(ctx context.Context, backup *v1alpha1.Backup) error {
if backup.Spec.Mode != v1alpha1.BackupModeSnapshot {
return nil
}

cleanOpt := clean.Options{Namespace: bm.Namespace, BackupName: bm.ResourceName}
return cleanOpt.CleanBRRemoteBackupData(ctx, backup)
}

func (bm *Manager) isBRCanContinueRunByCheckpoint() bool {
v, err := semver.NewVersion(bm.TiKVVersion)
if err != nil {
klog.Errorf("Parse version %s failure, error: %v", bm.TiKVVersion, err)
return false
}
lessThanV651, _ := semver.NewConstraint("<v6.5.1-0")
return !lessThanV651.Check(v)
}
8 changes: 4 additions & 4 deletions cmd/backup-manager/app/clean/clean.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (bo *Options) deleteSnapshotsAndBackupMeta(ctx context.Context, backup *v1a
return err
}

metaInfo := &util.EBSBasedBRMeta{}
metaInfo := &bkutil.EBSBasedBRMeta{}
if err = json.Unmarshal(contents, metaInfo); err != nil {
klog.Errorf("rclone copy remote backupmeta to local failure.")
return err
Expand All @@ -112,7 +112,7 @@ func (bo *Options) deleteSnapshotsAndBackupMeta(ctx context.Context, backup *v1a
return nil
}

func (bo *Options) deleteVolumeSnapshots(meta *util.EBSBasedBRMeta) error {
func (bo *Options) deleteVolumeSnapshots(meta *bkutil.EBSBasedBRMeta) error {
newVolumeIDMap := make(map[string]string)
for i := range meta.TiKVComponent.Stores {
store := meta.TiKVComponent.Stores[i]
Expand All @@ -122,7 +122,7 @@ func (bo *Options) deleteVolumeSnapshots(meta *util.EBSBasedBRMeta) error {
}
}

ec2Session, err := util.NewEC2Session(CloudAPIConcurrency)
ec2Session, err := bkutil.NewEC2Session(CloudAPIConcurrency)
if err != nil {
klog.Errorf("new a ec2 session failure.")
return err
Expand All @@ -136,7 +136,7 @@ func (bo *Options) deleteVolumeSnapshots(meta *util.EBSBasedBRMeta) error {
}

// cleanBRRemoteBackupData clean the backup data from remote
func (bo *Options) cleanBRRemoteBackupData(ctx context.Context, backup *v1alpha1.Backup) error {
func (bo *Options) CleanBRRemoteBackupData(ctx context.Context, backup *v1alpha1.Backup) error {
opt := backup.GetCleanOption()

backend, err := bkutil.NewStorageBackend(backup.Spec.StorageProvider, &bkutil.StorageCredential{})
Expand Down
79 changes: 78 additions & 1 deletion cmd/backup-manager/app/clean/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ package clean
import (
"context"
"fmt"
"sort"

"github.com/dustin/go-humanize"
"github.com/pingcap/tidb-operator/cmd/backup-manager/app/util"
"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
listers "github.com/pingcap/tidb-operator/pkg/client/listers/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/controller"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
errorutils "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/klog/v2"
)
Expand Down Expand Up @@ -72,11 +75,28 @@ func (bm *Manager) performCleanBackup(ctx context.Context, backup *v1alpha1.Back
var errs []error
var err error
// volume-snapshot backup requires to delete the snapshot firstly, then delete the backup meta file
// volume-snapshot is incremental snapshot per volume. Any backup deletion will take effects on next volume-snapshot backup
// we need update backup size of the impacted the volume-snapshot backup.
if backup.Spec.Mode == v1alpha1.BackupModeVolumeSnapshot {
nextNackup := bm.getNextBackup(ctx, backup)
if nextNackup == nil {
klog.Errorf("get next backup for cluster %s backup is nil", bm)
}

// clean backup will delete all vol snapshots
err = bm.cleanBackupMetaWithVolSnapshots(ctx, backup)
if err != nil {
klog.Errorf("delete backup %s for cluster %s backup failure", backup.Name, bm)
}

// update the next backup size
if nextNackup != nil {
bm.updateVolumeSnapshotBackupSize(ctx, nextNackup)
}

} else {
if backup.Spec.BR != nil {
err = bm.cleanBRRemoteBackupData(ctx, backup)
err = bm.CleanBRRemoteBackupData(ctx, backup)
} else {
opts := util.GetOptions(backup.Spec.StorageProvider)
err = bm.cleanRemoteBackupData(ctx, backup.Status.BackupPath, opts)
Expand All @@ -102,3 +122,60 @@ func (bm *Manager) performCleanBackup(ctx context.Context, backup *v1alpha1.Back
Status: corev1.ConditionTrue,
}, nil)
}

// getNextBackup to get next backup sorted by start time
func (bm *Manager) getNextBackup(ctx context.Context, backup *v1alpha1.Backup) *v1alpha1.Backup {
var err error
bks, err := bm.backupLister.Backups(backup.Namespace).List(labels.Everything())
if err != nil {
return nil
}

// sort the backup list by TimeStarted, since volume snapshot is point-in-time (start time) backup
sort.Slice(bks, func(i, j int) bool {
return bks[i].Status.TimeStarted.Before(&bks[j].Status.TimeStarted)
})

for i, bk := range bks {
if backup.Name == bk.Name {
return bm.getVolumeSnapshotBackup(bks[i+1:])
}
}

return nil
}

// getVolumeSnapshotBackup get the first volume-snapshot backup from backup list, which may contain non-volume snapshot
func (bm *Manager) getVolumeSnapshotBackup(backups []*v1alpha1.Backup) *v1alpha1.Backup {
for _, bk := range backups {
if bk.Spec.Mode == v1alpha1.BackupModeVolumeSnapshot {
return bk
}
}

// reach end of backup list, there is no volume snapshot backups
return nil
}

// updateVolumeSnapshotBackupSize update a volume-snapshot backup size
func (bm *Manager) updateVolumeSnapshotBackupSize(ctx context.Context, backup *v1alpha1.Backup) error {
var updateStatus *controller.BackupUpdateStatus

backupSize, err := util.CalcVolSnapBackupSize(ctx, backup.Spec.StorageProvider)

if err != nil {
klog.Warningf("Failed to parse BackupSize %d KB, %v", backupSize, err)
}

backupSizeReadable := humanize.Bytes(uint64(backupSize))

updateStatus = &controller.BackupUpdateStatus{
BackupSize: &backupSize,
BackupSizeReadable: &backupSizeReadable,
}

return bm.StatusUpdater.Update(backup, &v1alpha1.BackupCondition{
Type: v1alpha1.BackupComplete,
Status: corev1.ConditionTrue,
}, updateStatus)
}
Loading

0 comments on commit 7f4e089

Please sign in to comment.