Skip to content

Commit

Permalink
Add retry manager to reduce RateLimitExceeded errors
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewSirenko committed Apr 18, 2024
1 parent 4e6b82b commit 9e8c545
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 26 deletions.
42 changes: 33 additions & 9 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ type cloud struct {
ec2 EC2API
dm dm.DeviceManager
bm *batcherManager
rm *retryManager
}

var _ Cloud = &cloud{}
Expand Down Expand Up @@ -328,6 +329,8 @@ func newEC2Cloud(region string, awsSdkDebugLog bool, userAgentExtra string, batc
RecordRequestsMiddleware(),
RecordThrottledRequestsMiddleware(),
)

o.RetryMaxAttempts = retryMaxAttempt // Retry EC2 API calls at sdk level until request contexts are cancelled
})

var bm *batcherManager
Expand All @@ -336,11 +339,14 @@ func newEC2Cloud(region string, awsSdkDebugLog bool, userAgentExtra string, batc
bm = newBatcherManager(svc)
}

rm := newRetryManager()

return &cloud{
region: region,
dm: dm.NewDeviceManager(),
ec2: svc,
bm: bm,
rm: rm,
}
}

Expand Down Expand Up @@ -591,7 +597,9 @@ func (c *cloud) CreateDisk(ctx context.Context, volumeName string, diskOptions *
requestInput.SnapshotId = aws.String(snapshotID)
}

response, err := c.ec2.CreateVolume(ctx, requestInput)
response, err := c.ec2.CreateVolume(ctx, requestInput, func(o *ec2.Options) {
o.Retryer = c.rm.createVolumeRetryer
})
if err != nil {
if isAWSErrorSnapshotNotFound(err) {
return nil, ErrNotFound
Expand Down Expand Up @@ -723,7 +731,9 @@ func (c *cloud) ResizeOrModifyDisk(ctx context.Context, volumeID string, newSize
if options.Throughput != 0 {
req.Throughput = aws.Int32(options.Throughput)
}
response, err := c.ec2.ModifyVolume(ctx, req)
response, err := c.ec2.ModifyVolume(ctx, req, func(o *ec2.Options) {
o.Retryer = c.rm.modifyVolumeRetryer
})
if err != nil {
if isAWSErrorInvalidParameter(err) {
// Wrap error to preserve original message from AWS as to why this was an invalid argument
Expand All @@ -745,7 +755,9 @@ func (c *cloud) ResizeOrModifyDisk(ctx context.Context, volumeID string, newSize

func (c *cloud) DeleteDisk(ctx context.Context, volumeID string) (bool, error) {
request := &ec2.DeleteVolumeInput{VolumeId: &volumeID}
if _, err := c.ec2.DeleteVolume(ctx, request); err != nil {
if _, err := c.ec2.DeleteVolume(ctx, request, func(o *ec2.Options) {
o.Retryer = c.rm.deleteVolumeRetryer
}); err != nil {
if isAWSErrorVolumeNotFound(err) {
return false, ErrNotFound
}
Expand Down Expand Up @@ -850,7 +862,9 @@ func (c *cloud) AttachDisk(ctx context.Context, volumeID, nodeID string) (string
VolumeId: aws.String(volumeID),
}

resp, attachErr := c.ec2.AttachVolume(ctx, request)
resp, attachErr := c.ec2.AttachVolume(ctx, request, func(o *ec2.Options) {
o.Retryer = c.rm.attachVolumeRetryer
})
if attachErr != nil {
if isAWSErrorBlockDeviceInUse(attachErr) {
cacheMutex.Lock()
Expand Down Expand Up @@ -923,7 +937,9 @@ func (c *cloud) DetachDisk(ctx context.Context, volumeID, nodeID string) error {
VolumeId: aws.String(volumeID),
}

_, err = c.ec2.DetachVolume(ctx, request)
_, err = c.ec2.DetachVolume(ctx, request, func(o *ec2.Options) {
o.Retryer = c.rm.detachVolumeRetryer
})
if err != nil {
if isAWSErrorIncorrectState(err) ||
isAWSErrorInvalidAttachmentNotFound(err) ||
Expand Down Expand Up @@ -1225,7 +1241,9 @@ func (c *cloud) CreateSnapshot(ctx context.Context, volumeID string, snapshotOpt
Description: aws.String(descriptions),
}

res, err := c.ec2.CreateSnapshot(ctx, request)
res, err := c.ec2.CreateSnapshot(ctx, request, func(o *ec2.Options) {
o.Retryer = c.rm.createSnapshotRetryer
})
if err != nil {
return nil, fmt.Errorf("error creating snapshot of volume %s: %w", volumeID, err)
}
Expand All @@ -1246,7 +1264,9 @@ func (c *cloud) DeleteSnapshot(ctx context.Context, snapshotID string) (success
request := &ec2.DeleteSnapshotInput{}
request.SnapshotId = aws.String(snapshotID)
request.DryRun = aws.Bool(false)
if _, err := c.ec2.DeleteSnapshot(ctx, request); err != nil {
if _, err := c.ec2.DeleteSnapshot(ctx, request, func(o *ec2.Options) {
o.Retryer = c.rm.deleteSnapshotRetryer
}); err != nil {
if isAWSErrorSnapshotNotFound(err) {
return false, ErrNotFound
}
Expand Down Expand Up @@ -1353,7 +1373,9 @@ func (c *cloud) EnableFastSnapshotRestores(ctx context.Context, availabilityZone
SourceSnapshotIds: []string{snapshotID},
}
klog.V(4).InfoS("Creating Fast Snapshot Restores", "snapshotID", snapshotID, "availabilityZones", availabilityZones)
response, err := c.ec2.EnableFastSnapshotRestores(ctx, request)
response, err := c.ec2.EnableFastSnapshotRestores(ctx, request, func(o *ec2.Options) {
o.Retryer = c.rm.enableFastSnapshotRestoresRetryer
})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1709,7 +1731,9 @@ func (c *cloud) getLatestVolumeModification(ctx context.Context, volumeID string
}

if c.bm == nil || !isBatchable {
mod, err := c.ec2.DescribeVolumesModifications(ctx, request)
mod, err := c.ec2.DescribeVolumesModifications(ctx, request, func(o *ec2.Options) {
o.Retryer = c.rm.unbatchableDescribeVolumesModificationsRetryer
})
if err != nil {
if isAWSErrorModificationNotFound(err) {
return nil, VolumeNotBeingModified
Expand Down
35 changes: 18 additions & 17 deletions pkg/cloud/cloud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1216,7 +1216,7 @@ func TestCreateDisk(t *testing.T) {
defer ctxCancel()

if tc.expCreateVolumeInput != nil {
mockEC2.EXPECT().CreateVolume(gomock.Any(), gomock.Any()).Return(&ec2.CreateVolumeOutput{
mockEC2.EXPECT().CreateVolume(gomock.Any(), gomock.Any(), gomock.Any()).Return(&ec2.CreateVolumeOutput{
VolumeId: aws.String(tc.diskOptions.Tags[VolumeNameTagKey]),
Size: aws.Int32(util.BytesToGiB(tc.diskOptions.CapacityBytes)),
OutpostArn: aws.String(tc.diskOptions.OutpostArn),
Expand All @@ -1234,7 +1234,7 @@ func TestCreateDisk(t *testing.T) {
}, tc.expDescVolumeErr).AnyTimes()
if tc.diskOptions.AvailabilityZone == "snow" {
mockEC2.EXPECT().CreateTags(gomock.Any(), gomock.Any()).Return(&ec2.CreateTagsOutput{}, tc.expCreateTagsErr)
mockEC2.EXPECT().DeleteVolume(gomock.Any(), gomock.Any()).Return(&ec2.DeleteVolumeOutput{}, nil).AnyTimes()
mockEC2.EXPECT().DeleteVolume(gomock.Any(), gomock.Any(), gomock.Any()).Return(&ec2.DeleteVolumeOutput{}, nil).AnyTimes()
}
if len(tc.diskOptions.SnapshotID) > 0 {
mockEC2.EXPECT().DescribeSnapshots(gomock.Any(), gomock.Any()).Return(&ec2.DescribeSnapshotsOutput{Snapshots: []types.Snapshot{snapshot}}, nil).AnyTimes()
Expand Down Expand Up @@ -1313,7 +1313,7 @@ func TestDeleteDisk(t *testing.T) {
c := newCloud(mockEC2)

ctx := context.Background()
mockEC2.EXPECT().DeleteVolume(gomock.Any(), gomock.Any()).Return(&ec2.DeleteVolumeOutput{}, tc.expErr)
mockEC2.EXPECT().DeleteVolume(gomock.Any(), gomock.Any(), gomock.Any()).Return(&ec2.DeleteVolumeOutput{}, tc.expErr)

ok, err := c.DeleteDisk(ctx, tc.volumeID)
if err != nil && tc.expErr == nil {
Expand Down Expand Up @@ -1362,7 +1362,7 @@ func TestAttachDisk(t *testing.T) {

gomock.InOrder(
mockEC2.EXPECT().DescribeInstances(gomock.Any(), gomock.Eq(instanceRequest)).Return(newDescribeInstancesOutput(nodeID), nil),
mockEC2.EXPECT().AttachVolume(gomock.Any(), gomock.Eq(attachRequest)).Return(&ec2.AttachVolumeOutput{
mockEC2.EXPECT().AttachVolume(gomock.Any(), gomock.Eq(attachRequest), gomock.Any()).Return(&ec2.AttachVolumeOutput{
Device: aws.String(path),
InstanceId: aws.String(nodeID),
VolumeId: aws.String(volumeID),
Expand All @@ -1385,7 +1385,7 @@ func TestAttachDisk(t *testing.T) {

gomock.InOrder(
mockEC2.EXPECT().DescribeInstances(gomock.Any(), gomock.Eq(instanceRequest)).Return(newDescribeInstancesOutput(nodeID), nil),
mockEC2.EXPECT().AttachVolume(gomock.Any(), gomock.Eq(attachRequest)).Return(&ec2.AttachVolumeOutput{
mockEC2.EXPECT().AttachVolume(gomock.Any(), gomock.Eq(attachRequest), gomock.Any()).Return(&ec2.AttachVolumeOutput{
Device: aws.String(path),
InstanceId: aws.String(nodeID),
VolumeId: aws.String(volumeID),
Expand Down Expand Up @@ -1435,7 +1435,7 @@ func TestAttachDisk(t *testing.T) {

gomock.InOrder(
mockEC2.EXPECT().DescribeInstances(gomock.Any(), instanceRequest).Return(newDescribeInstancesOutput(nodeID), nil),
mockEC2.EXPECT().AttachVolume(gomock.Any(), attachRequest).Return(nil, errors.New("AttachVolume error")),
mockEC2.EXPECT().AttachVolume(gomock.Any(), attachRequest, gomock.Any()).Return(nil, errors.New("AttachVolume error")),
)
},
validateFunc: func(t *testing.T) {
Expand All @@ -1454,7 +1454,7 @@ func TestAttachDisk(t *testing.T) {

gomock.InOrder(
mockEC2.EXPECT().DescribeInstances(ctx, instanceRequest).Return(newDescribeInstancesOutput(nodeID), nil),
mockEC2.EXPECT().AttachVolume(ctx, attachRequest).Return(nil, blockDeviceInUseErr),
mockEC2.EXPECT().AttachVolume(ctx, attachRequest, gomock.Any()).Return(nil, blockDeviceInUseErr),
)
},
validateFunc: func(t *testing.T) {
Expand Down Expand Up @@ -1500,7 +1500,7 @@ func TestAttachDisk(t *testing.T) {

gomock.InOrder(
mockEC2.EXPECT().DescribeInstances(ctx, gomock.Eq(instanceRequest)).Return(newDescribeInstancesOutput(nodeID), nil),
mockEC2.EXPECT().AttachVolume(ctx, gomock.Eq(attachRequest)).Return(&ec2.AttachVolumeOutput{
mockEC2.EXPECT().AttachVolume(ctx, gomock.Eq(attachRequest), gomock.Any()).Return(&ec2.AttachVolumeOutput{
Device: aws.String(path),
InstanceId: aws.String(nodeID),
VolumeId: aws.String(volumeID),
Expand All @@ -1509,7 +1509,7 @@ func TestAttachDisk(t *testing.T) {
mockEC2.EXPECT().DescribeVolumes(ctx, gomock.Eq(volumeRequest)).Return(createDescribeVolumesOutput([]*string{&volumeID}, nodeID, path, "attached"), nil),

mockEC2.EXPECT().DescribeInstances(ctx, gomock.Eq(createInstanceRequest2)).Return(newDescribeInstancesOutput(nodeID2), nil),
mockEC2.EXPECT().AttachVolume(ctx, gomock.Eq(attachRequest2)).Return(&ec2.AttachVolumeOutput{
mockEC2.EXPECT().AttachVolume(ctx, gomock.Eq(attachRequest2), gomock.Any()).Return(&ec2.AttachVolumeOutput{
Device: aws.String(path),
InstanceId: aws.String(nodeID2),
VolumeId: aws.String(volumeID),
Expand Down Expand Up @@ -1580,7 +1580,7 @@ func TestDetachDisk(t *testing.T) {

gomock.InOrder(
mockEC2.EXPECT().DescribeInstances(gomock.Any(), instanceRequest).Return(newDescribeInstancesOutput(nodeID), nil),
mockEC2.EXPECT().DetachVolume(gomock.Any(), detachRequest).Return(nil, nil),
mockEC2.EXPECT().DetachVolume(gomock.Any(), detachRequest, gomock.Any()).Return(nil, nil),
mockEC2.EXPECT().DescribeVolumes(gomock.Any(), volumeRequest).Return(createDescribeVolumesOutput([]*string{&volumeID}, nodeID, "", "detached"), nil),
)
},
Expand All @@ -1596,7 +1596,7 @@ func TestDetachDisk(t *testing.T) {

gomock.InOrder(
mockEC2.EXPECT().DescribeInstances(gomock.Any(), instanceRequest).Return(newDescribeInstancesOutput(nodeID), nil),
mockEC2.EXPECT().DetachVolume(gomock.Any(), detachRequest).Return(nil, errors.New("DetachVolume error")),
mockEC2.EXPECT().DetachVolume(gomock.Any(), detachRequest, gomock.Any()).Return(nil, errors.New("DetachVolume error")),
)
},
},
Expand All @@ -1611,7 +1611,7 @@ func TestDetachDisk(t *testing.T) {

gomock.InOrder(
mockEC2.EXPECT().DescribeInstances(gomock.Any(), instanceRequest).Return(newDescribeInstancesOutput(nodeID), nil),
mockEC2.EXPECT().DetachVolume(gomock.Any(), detachRequest).Return(nil, ErrNotFound),
mockEC2.EXPECT().DetachVolume(gomock.Any(), detachRequest, gomock.Any()).Return(nil, ErrNotFound),
)
},
},
Expand Down Expand Up @@ -1984,7 +1984,7 @@ func TestEnableFastSnapshotRestores(t *testing.T) {
c := newCloud(mockEC2)

ctx := context.Background()
mockEC2.EXPECT().EnableFastSnapshotRestores(gomock.Any(), gomock.Any()).Return(tc.expOutput, tc.expErr).AnyTimes()
mockEC2.EXPECT().EnableFastSnapshotRestores(gomock.Any(), gomock.Any(), gomock.Any()).Return(tc.expOutput, tc.expErr).AnyTimes()

response, err := c.EnableFastSnapshotRestores(ctx, tc.availabilityZones, tc.snapshotID)

Expand Down Expand Up @@ -2101,7 +2101,7 @@ func TestDeleteSnapshot(t *testing.T) {
c := newCloud(mockEC2)

ctx := context.Background()
mockEC2.EXPECT().DeleteSnapshot(gomock.Any(), gomock.Any()).Return(&ec2.DeleteSnapshotOutput{}, tc.expErr)
mockEC2.EXPECT().DeleteSnapshot(gomock.Any(), gomock.Any(), gomock.Any()).Return(&ec2.DeleteSnapshotOutput{}, tc.expErr)

_, err := c.DeleteSnapshot(ctx, tc.snapshotName)
if err != nil {
Expand Down Expand Up @@ -2425,13 +2425,13 @@ func TestResizeOrModifyDisk(t *testing.T) {
}
}
if tc.modifiedVolume != nil || tc.modifiedVolumeError != nil {
mockEC2.EXPECT().ModifyVolume(gomock.Any(), gomock.Any()).Return(tc.modifiedVolume, tc.modifiedVolumeError).AnyTimes()
mockEC2.EXPECT().ModifyVolume(gomock.Any(), gomock.Any(), gomock.Any()).Return(tc.modifiedVolume, tc.modifiedVolumeError).AnyTimes()
}
if tc.descModVolume != nil {
mockEC2.EXPECT().DescribeVolumesModifications(gomock.Any(), gomock.Any()).Return(tc.descModVolume, nil).AnyTimes()
mockEC2.EXPECT().DescribeVolumesModifications(gomock.Any(), gomock.Any(), gomock.Any()).Return(tc.descModVolume, nil).AnyTimes()
} else {
emptyOutput := &ec2.DescribeVolumesModificationsOutput{}
mockEC2.EXPECT().DescribeVolumesModifications(gomock.Any(), gomock.Any()).Return(emptyOutput, nil).AnyTimes()
mockEC2.EXPECT().DescribeVolumesModifications(gomock.Any(), gomock.Any(), gomock.Any()).Return(emptyOutput, nil).AnyTimes()
}

newSize, err := c.ResizeOrModifyDisk(ctx, tc.volumeID, util.GiBToBytes(tc.reqSizeGiB), tc.modifyDiskOptions)
Expand Down Expand Up @@ -3077,6 +3077,7 @@ func newCloud(mockEC2 EC2API) Cloud {
region: "test-region",
dm: dm.NewDeviceManager(),
ec2: mockEC2,
rm: newRetryManager(),
}
return c
}
Expand Down
50 changes: 50 additions & 0 deletions pkg/cloud/retry_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package cloud

import (
"github.com/aws/aws-sdk-go-v2/aws/retry"
)

const (
// retryMaxAttempt sets max number of EC2 API call attempts.
// Set high enough to ensure default sidecar timeout will cancel context long before we stop retrying.
retryMaxAttempt = 50
)

// newAdaptiveRetryer restricts attempts of API calls that recently hit throttle errors.
func newAdaptiveRetryer() *retry.AdaptiveMode {
return retry.NewAdaptiveMode(func(ao *retry.AdaptiveModeOptions) {
ao.StandardOptions = append(ao.StandardOptions, func(so *retry.StandardOptions) {
so.MaxAttempts = retryMaxAttempt
})
})
}

// retryManager dictates the retry strategies of EC2 API calls.
// Each mutating EC2 API has its own retryer because the AWS SDK throttles on a retryer object level, not by API name.
// While default AWS accounts share request tokens between mutating APIs, users can raise limits for individual APIs.
// Separate retryers ensures that throttling one API doesn't unintentionally throttle others with separate token buckets.
type retryManager struct {
createVolumeRetryer *retry.AdaptiveMode
deleteVolumeRetryer *retry.AdaptiveMode
attachVolumeRetryer *retry.AdaptiveMode
detachVolumeRetryer *retry.AdaptiveMode
modifyVolumeRetryer *retry.AdaptiveMode
createSnapshotRetryer *retry.AdaptiveMode
deleteSnapshotRetryer *retry.AdaptiveMode
enableFastSnapshotRestoresRetryer *retry.AdaptiveMode
unbatchableDescribeVolumesModificationsRetryer *retry.AdaptiveMode
}

func newRetryManager() *retryManager {
return &retryManager{
createVolumeRetryer: newAdaptiveRetryer(),
attachVolumeRetryer: newAdaptiveRetryer(),
deleteVolumeRetryer: newAdaptiveRetryer(),
detachVolumeRetryer: newAdaptiveRetryer(),
modifyVolumeRetryer: newAdaptiveRetryer(),
createSnapshotRetryer: newAdaptiveRetryer(),
deleteSnapshotRetryer: newAdaptiveRetryer(),
enableFastSnapshotRestoresRetryer: newAdaptiveRetryer(),
unbatchableDescribeVolumesModificationsRetryer: newAdaptiveRetryer(),
}
}

0 comments on commit 9e8c545

Please sign in to comment.