From d38ff89b6be8b32815bcdc00af1d4d53cbfba51c Mon Sep 17 00:00:00 2001 From: BornChanger Date: Wed, 9 Aug 2023 00:06:08 +0800 Subject: [PATCH] ebs br: add retry support for ListSnapshotBlocks() and ListChangedBlocks() Signed-off-by: BornChanger --- cmd/backup-manager/app/util/backup_size.go | 136 +++++++++++++++------ 1 file changed, 101 insertions(+), 35 deletions(-) diff --git a/cmd/backup-manager/app/util/backup_size.go b/cmd/backup-manager/app/util/backup_size.go index 06be5303e63..8aa39bd71b0 100644 --- a/cmd/backup-manager/app/util/backup_size.go +++ b/cmd/backup-manager/app/util/backup_size.go @@ -195,10 +195,16 @@ func calcBackupSize(ctx context.Context, volumes map[string]string, level string return } -// calculateSnapshotSize calculate size of an snapshot in bytes by listing its blocks. +// calculateSnapshotSize calculate size of a snapshot in bytes by listing its blocks. func calculateSnapshotSize(volumeId, snapshotId string) (uint64, uint64, error) { var snapshotSize uint64 var numApiReq uint64 + + start := time.Now() + + klog.Infof("start to calculate snapshot size for %s, base on snapshot %s, volume id %s", + snapshotId, volumeId) + ebsSession, err := util.NewEBSSession(util.CloudAPIConcurrency) if err != nil { klog.Errorf("new a ebs session failure.") @@ -207,28 +213,57 @@ func calculateSnapshotSize(volumeId, snapshotId string) (uint64, uint64, error) var nextToken *string for { - resp, err := ebsSession.EBS.ListSnapshotBlocks(&ebs.ListSnapshotBlocksInput{ - SnapshotId: aws.String(snapshotId), - MaxResults: aws.Int64(ListSnapMaxReturnResult), - NextToken: nextToken, - }) - numApiReq += 1 - if err != nil { - return 0, numApiReq, err + // Each retry interval is around 1 second, and no more than 60 times retry (~1 minute) + backoff := wait.Backoff{ + Duration: time.Second, + Steps: 60, + Factor: 1.0, + Jitter: 0.1, + } + + isAllListed := false + + listBlocks := func() error { + resp, err := ebsSession.EBS.ListSnapshotBlocks(&ebs.ListSnapshotBlocksInput{ + SnapshotId: aws.String(snapshotId), + MaxResults: aws.Int64(ListSnapMaxReturnResult), + NextToken: nextToken, + }) + numApiReq += 1 + if err != nil { + return err + } + if resp.BlockSize != nil { + snapshotSize += uint64(len(resp.Blocks)) * uint64(*resp.BlockSize) + } + + // check if there is more to retrieve + if resp.NextToken == nil { + isAllListed = true + } + nextToken = resp.NextToken + + return nil } - if resp.BlockSize != nil { - snapshotSize += uint64(len(resp.Blocks)) * uint64(*resp.BlockSize) + + isRetry := func(err error) bool { + return !strings.Contains(err.Error(), "RequestThrottledException") } + err = retry.OnError(backoff, isRetry, listBlocks) - // check if there is more to retrieve - if resp.NextToken == nil { + if err != nil { + return 0, numApiReq, errors.Annotatef(err, "ListSnapshotBlocks() failed against snapshot id %s, volume id %s", snapshotId, volumeId) + } + + if isAllListed { break } - nextToken = resp.NextToken } - klog.Infof("full snapshot size %s, num of ListSnapshotBlocks request %d, snapshot id %s, volume id %s", - humanize.Bytes(snapshotSize), numApiReq, snapshotId, volumeId) + elapsed := time.Since(start) + + klog.Infof("full snapshot size %s, num of ListSnapshotBlocks request %d, snapshot id %s, volume id %s, takes %v", + humanize.Bytes(snapshotSize), numApiReq, snapshotId, volumeId, elapsed) return snapshotSize, numApiReq, nil } @@ -239,6 +274,8 @@ func calculateChangedBlocksSize(volumeId, preSnapshotId, snapshotId string) (uin var snapshotSize uint64 var numApiReq uint64 + start := time.Now() + klog.Infof("start to calculate incremental snapshot size for %s, base on prev snapshot %s, volume id %s", snapshotId, preSnapshotId, volumeId) @@ -251,34 +288,63 @@ func calculateChangedBlocksSize(volumeId, preSnapshotId, snapshotId string) (uin var nextToken *string for { - resp, err := ebsSession.EBS.ListChangedBlocks(&ebs.ListChangedBlocksInput{ - FirstSnapshotId: aws.String(preSnapshotId), - MaxResults: aws.Int64(ListSnapMaxReturnResult), - SecondSnapshotId: aws.String(snapshotId), - NextToken: nextToken, - }) - numApiReq += 1 - if err != nil { - return 0, numApiReq, err + // Each retry interval is around 1 second, and no more than 60 times retry (~1 minute) + backoff := wait.Backoff{ + Duration: time.Second, + Steps: 60, + Factor: 1.0, + Jitter: 0.1, } - numBlocks += len(resp.ChangedBlocks) - // retrieve only changed block and blocks only existed in current snapshot (new add blocks) - for _, block := range resp.ChangedBlocks { - if block.SecondBlockToken != nil && resp.BlockSize != nil { - snapshotSize += uint64(*resp.BlockSize) + isAllChangeListed := false + + listChangeBlocks := func() error { + resp, err := ebsSession.EBS.ListChangedBlocks(&ebs.ListChangedBlocksInput{ + FirstSnapshotId: aws.String(preSnapshotId), + MaxResults: aws.Int64(ListSnapMaxReturnResult), + SecondSnapshotId: aws.String(snapshotId), + NextToken: nextToken, + }) + numApiReq += 1 + if err != nil { + return err } + numBlocks += len(resp.ChangedBlocks) + + // retrieve only changed block and blocks only existed in current snapshot (new add blocks) + for _, block := range resp.ChangedBlocks { + if block.SecondBlockToken != nil && resp.BlockSize != nil { + snapshotSize += uint64(*resp.BlockSize) + } + } + + // check if there is more to retrieve + if resp.NextToken == nil { + isAllChangeListed = true + } + nextToken = resp.NextToken + + return nil } - // check if there is more to retrieve - if resp.NextToken == nil { + isRetry := func(err error) bool { + return !strings.Contains(err.Error(), "RequestThrottledException") + } + err = retry.OnError(backoff, isRetry, listChangeBlocks) + + if err != nil { + return 0, numApiReq, errors.Annotatef(err, "ListChangedBlocks() failed against volume id %s, preSnapshot id %s, snapshot id %s", volumeId, preSnapshotId, snapshotId) + } + + if isAllChangeListed { break } - nextToken = resp.NextToken } - klog.Infof("incremental snapshot size %s, num of api ListChangedBlocks request %d, snapshot id %s, volume id %s", - humanize.Bytes(snapshotSize), numApiReq, snapshotId, volumeId) + elapsed := time.Since(start) + + klog.Infof("incremental snapshot size %s, num of api ListChangedBlocks request %d, snapshot id %s, volume id %s, takes %v", + humanize.Bytes(snapshotSize), numApiReq, snapshotId, volumeId, elapsed) return snapshotSize, numApiReq, nil }