Skip to content

Commit

Permalink
ebs br: add retry support for ListSnapshotBlocks() and ListChangedBlo…
Browse files Browse the repository at this point in the history
…cks()

Signed-off-by: BornChanger <[email protected]>
  • Loading branch information
BornChanger committed Aug 8, 2023
1 parent ddc83e6 commit d38ff89
Showing 1 changed file with 101 additions and 35 deletions.
136 changes: 101 additions & 35 deletions cmd/backup-manager/app/util/backup_size.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,16 @@ func calcBackupSize(ctx context.Context, volumes map[string]string, level string
return
}

// calculateSnapshotSize calculate size of an snapshot in bytes by listing its blocks.
// calculateSnapshotSize calculate size of a snapshot in bytes by listing its blocks.
func calculateSnapshotSize(volumeId, snapshotId string) (uint64, uint64, error) {
var snapshotSize uint64
var numApiReq uint64

start := time.Now()

klog.Infof("start to calculate snapshot size for %s, base on snapshot %s, volume id %s",
snapshotId, volumeId)

ebsSession, err := util.NewEBSSession(util.CloudAPIConcurrency)
if err != nil {
klog.Errorf("new a ebs session failure.")
Expand All @@ -207,28 +213,57 @@ func calculateSnapshotSize(volumeId, snapshotId string) (uint64, uint64, error)

var nextToken *string
for {
resp, err := ebsSession.EBS.ListSnapshotBlocks(&ebs.ListSnapshotBlocksInput{
SnapshotId: aws.String(snapshotId),
MaxResults: aws.Int64(ListSnapMaxReturnResult),
NextToken: nextToken,
})
numApiReq += 1
if err != nil {
return 0, numApiReq, err
// Each retry interval is around 1 second, and no more than 60 times retry (~1 minute)
backoff := wait.Backoff{
Duration: time.Second,
Steps: 60,
Factor: 1.0,
Jitter: 0.1,
}

isAllListed := false

listBlocks := func() error {
resp, err := ebsSession.EBS.ListSnapshotBlocks(&ebs.ListSnapshotBlocksInput{
SnapshotId: aws.String(snapshotId),
MaxResults: aws.Int64(ListSnapMaxReturnResult),
NextToken: nextToken,
})
numApiReq += 1
if err != nil {
return err
}
if resp.BlockSize != nil {
snapshotSize += uint64(len(resp.Blocks)) * uint64(*resp.BlockSize)
}

// check if there is more to retrieve
if resp.NextToken == nil {
isAllListed = true
}
nextToken = resp.NextToken

return nil
}
if resp.BlockSize != nil {
snapshotSize += uint64(len(resp.Blocks)) * uint64(*resp.BlockSize)

isRetry := func(err error) bool {
return !strings.Contains(err.Error(), "RequestThrottledException")
}
err = retry.OnError(backoff, isRetry, listBlocks)

// check if there is more to retrieve
if resp.NextToken == nil {
if err != nil {
return 0, numApiReq, errors.Annotatef(err, "ListSnapshotBlocks() failed against snapshot id %s, volume id %s", snapshotId, volumeId)
}

if isAllListed {
break
}
nextToken = resp.NextToken
}

klog.Infof("full snapshot size %s, num of ListSnapshotBlocks request %d, snapshot id %s, volume id %s",
humanize.Bytes(snapshotSize), numApiReq, snapshotId, volumeId)
elapsed := time.Since(start)

klog.Infof("full snapshot size %s, num of ListSnapshotBlocks request %d, snapshot id %s, volume id %s, takes %v",
humanize.Bytes(snapshotSize), numApiReq, snapshotId, volumeId, elapsed)

return snapshotSize, numApiReq, nil
}
Expand All @@ -239,6 +274,8 @@ func calculateChangedBlocksSize(volumeId, preSnapshotId, snapshotId string) (uin
var snapshotSize uint64
var numApiReq uint64

start := time.Now()

klog.Infof("start to calculate incremental snapshot size for %s, base on prev snapshot %s, volume id %s",
snapshotId, preSnapshotId, volumeId)

Expand All @@ -251,34 +288,63 @@ func calculateChangedBlocksSize(volumeId, preSnapshotId, snapshotId string) (uin
var nextToken *string

for {
resp, err := ebsSession.EBS.ListChangedBlocks(&ebs.ListChangedBlocksInput{
FirstSnapshotId: aws.String(preSnapshotId),
MaxResults: aws.Int64(ListSnapMaxReturnResult),
SecondSnapshotId: aws.String(snapshotId),
NextToken: nextToken,
})
numApiReq += 1
if err != nil {
return 0, numApiReq, err
// Each retry interval is around 1 second, and no more than 60 times retry (~1 minute)
backoff := wait.Backoff{
Duration: time.Second,
Steps: 60,
Factor: 1.0,
Jitter: 0.1,
}
numBlocks += len(resp.ChangedBlocks)

// retrieve only changed block and blocks only existed in current snapshot (new add blocks)
for _, block := range resp.ChangedBlocks {
if block.SecondBlockToken != nil && resp.BlockSize != nil {
snapshotSize += uint64(*resp.BlockSize)
isAllChangeListed := false

listChangeBlocks := func() error {
resp, err := ebsSession.EBS.ListChangedBlocks(&ebs.ListChangedBlocksInput{
FirstSnapshotId: aws.String(preSnapshotId),
MaxResults: aws.Int64(ListSnapMaxReturnResult),
SecondSnapshotId: aws.String(snapshotId),
NextToken: nextToken,
})
numApiReq += 1
if err != nil {
return err
}
numBlocks += len(resp.ChangedBlocks)

// retrieve only changed block and blocks only existed in current snapshot (new add blocks)
for _, block := range resp.ChangedBlocks {
if block.SecondBlockToken != nil && resp.BlockSize != nil {
snapshotSize += uint64(*resp.BlockSize)
}
}

// check if there is more to retrieve
if resp.NextToken == nil {
isAllChangeListed = true
}
nextToken = resp.NextToken

return nil
}

// check if there is more to retrieve
if resp.NextToken == nil {
isRetry := func(err error) bool {
return !strings.Contains(err.Error(), "RequestThrottledException")
}
err = retry.OnError(backoff, isRetry, listChangeBlocks)

if err != nil {
return 0, numApiReq, errors.Annotatef(err, "ListChangedBlocks() failed against volume id %s, preSnapshot id %s, snapshot id %s", volumeId, preSnapshotId, snapshotId)
}

if isAllChangeListed {
break
}
nextToken = resp.NextToken
}

klog.Infof("incremental snapshot size %s, num of api ListChangedBlocks request %d, snapshot id %s, volume id %s",
humanize.Bytes(snapshotSize), numApiReq, snapshotId, volumeId)
elapsed := time.Since(start)

klog.Infof("incremental snapshot size %s, num of api ListChangedBlocks request %d, snapshot id %s, volume id %s, takes %v",
humanize.Bytes(snapshotSize), numApiReq, snapshotId, volumeId, elapsed)
return snapshotSize, numApiReq, nil
}

Expand Down

0 comments on commit d38ff89

Please sign in to comment.