diff --git a/pkg/cloud/cloud.go b/pkg/cloud/cloud.go index ac6984bc7b..a0615593b4 100644 --- a/pkg/cloud/cloud.go +++ b/pkg/cloud/cloud.go @@ -281,6 +281,7 @@ type cloud struct { ec2 EC2API dm dm.DeviceManager bm *batcherManager + rm *retryManager } var _ Cloud = &cloud{} @@ -328,6 +329,8 @@ func newEC2Cloud(region string, awsSdkDebugLog bool, userAgentExtra string, batc RecordRequestsMiddleware(), RecordThrottledRequestsMiddleware(), ) + + o.RetryMaxAttempts = retryMaxAttempt // Retry EC2 API calls at sdk level until request contexts are cancelled }) var bm *batcherManager @@ -336,11 +339,14 @@ func newEC2Cloud(region string, awsSdkDebugLog bool, userAgentExtra string, batc bm = newBatcherManager(svc) } + rm := newRetryManager() + return &cloud{ region: region, dm: dm.NewDeviceManager(), ec2: svc, bm: bm, + rm: rm, } } @@ -591,7 +597,9 @@ func (c *cloud) CreateDisk(ctx context.Context, volumeName string, diskOptions * requestInput.SnapshotId = aws.String(snapshotID) } - response, err := c.ec2.CreateVolume(ctx, requestInput) + response, err := c.ec2.CreateVolume(ctx, requestInput, func(o *ec2.Options) { + o.Retryer = c.rm.createVolumeRetryer + }) if err != nil { if isAWSErrorSnapshotNotFound(err) { return nil, ErrNotFound @@ -723,7 +731,9 @@ func (c *cloud) ResizeOrModifyDisk(ctx context.Context, volumeID string, newSize if options.Throughput != 0 { req.Throughput = aws.Int32(options.Throughput) } - response, err := c.ec2.ModifyVolume(ctx, req) + response, err := c.ec2.ModifyVolume(ctx, req, func(o *ec2.Options) { + o.Retryer = c.rm.modifyVolumeRetryer + }) if err != nil { if isAWSErrorInvalidParameter(err) { // Wrap error to preserve original message from AWS as to why this was an invalid argument @@ -745,7 +755,9 @@ func (c *cloud) ResizeOrModifyDisk(ctx context.Context, volumeID string, newSize func (c *cloud) DeleteDisk(ctx context.Context, volumeID string) (bool, error) { request := &ec2.DeleteVolumeInput{VolumeId: &volumeID} - if _, err := c.ec2.DeleteVolume(ctx, request); err != nil { + if _, err := c.ec2.DeleteVolume(ctx, request, func(o *ec2.Options) { + o.Retryer = c.rm.deleteVolumeRetryer + }); err != nil { if isAWSErrorVolumeNotFound(err) { return false, ErrNotFound } @@ -850,7 +862,9 @@ func (c *cloud) AttachDisk(ctx context.Context, volumeID, nodeID string) (string VolumeId: aws.String(volumeID), } - resp, attachErr := c.ec2.AttachVolume(ctx, request) + resp, attachErr := c.ec2.AttachVolume(ctx, request, func(o *ec2.Options) { + o.Retryer = c.rm.attachVolumeRetryer + }) if attachErr != nil { if isAWSErrorBlockDeviceInUse(attachErr) { cacheMutex.Lock() @@ -923,7 +937,9 @@ func (c *cloud) DetachDisk(ctx context.Context, volumeID, nodeID string) error { VolumeId: aws.String(volumeID), } - _, err = c.ec2.DetachVolume(ctx, request) + _, err = c.ec2.DetachVolume(ctx, request, func(o *ec2.Options) { + o.Retryer = c.rm.detachVolumeRetryer + }) if err != nil { if isAWSErrorIncorrectState(err) || isAWSErrorInvalidAttachmentNotFound(err) || @@ -1225,7 +1241,9 @@ func (c *cloud) CreateSnapshot(ctx context.Context, volumeID string, snapshotOpt Description: aws.String(descriptions), } - res, err := c.ec2.CreateSnapshot(ctx, request) + res, err := c.ec2.CreateSnapshot(ctx, request, func(o *ec2.Options) { + o.Retryer = c.rm.createSnapshotRetryer + }) if err != nil { return nil, fmt.Errorf("error creating snapshot of volume %s: %w", volumeID, err) } @@ -1246,7 +1264,9 @@ func (c *cloud) DeleteSnapshot(ctx context.Context, snapshotID string) (success request := &ec2.DeleteSnapshotInput{} request.SnapshotId = aws.String(snapshotID) request.DryRun = aws.Bool(false) - if _, err := c.ec2.DeleteSnapshot(ctx, request); err != nil { + if _, err := c.ec2.DeleteSnapshot(ctx, request, func(o *ec2.Options) { + o.Retryer = c.rm.deleteSnapshotRetryer + }); err != nil { if isAWSErrorSnapshotNotFound(err) { return false, ErrNotFound } @@ -1353,7 +1373,9 @@ func (c *cloud) EnableFastSnapshotRestores(ctx context.Context, availabilityZone SourceSnapshotIds: []string{snapshotID}, } klog.V(4).InfoS("Creating Fast Snapshot Restores", "snapshotID", snapshotID, "availabilityZones", availabilityZones) - response, err := c.ec2.EnableFastSnapshotRestores(ctx, request) + response, err := c.ec2.EnableFastSnapshotRestores(ctx, request, func(o *ec2.Options) { + o.Retryer = c.rm.enableFastSnapshotRestoresRetryer + }) if err != nil { return nil, err } @@ -1709,7 +1731,9 @@ func (c *cloud) getLatestVolumeModification(ctx context.Context, volumeID string } if c.bm == nil || !isBatchable { - mod, err := c.ec2.DescribeVolumesModifications(ctx, request) + mod, err := c.ec2.DescribeVolumesModifications(ctx, request, func(o *ec2.Options) { + o.Retryer = c.rm.unbatchableDescribeVolumesModificationsRetryer + }) if err != nil { if isAWSErrorModificationNotFound(err) { return nil, VolumeNotBeingModified diff --git a/pkg/cloud/cloud_test.go b/pkg/cloud/cloud_test.go index 48f11f42df..65fd1ceea0 100644 --- a/pkg/cloud/cloud_test.go +++ b/pkg/cloud/cloud_test.go @@ -1216,7 +1216,7 @@ func TestCreateDisk(t *testing.T) { defer ctxCancel() if tc.expCreateVolumeInput != nil { - mockEC2.EXPECT().CreateVolume(gomock.Any(), gomock.Any()).Return(&ec2.CreateVolumeOutput{ + mockEC2.EXPECT().CreateVolume(gomock.Any(), gomock.Any(), gomock.Any()).Return(&ec2.CreateVolumeOutput{ VolumeId: aws.String(tc.diskOptions.Tags[VolumeNameTagKey]), Size: aws.Int32(util.BytesToGiB(tc.diskOptions.CapacityBytes)), OutpostArn: aws.String(tc.diskOptions.OutpostArn), @@ -1234,7 +1234,7 @@ func TestCreateDisk(t *testing.T) { }, tc.expDescVolumeErr).AnyTimes() if tc.diskOptions.AvailabilityZone == "snow" { mockEC2.EXPECT().CreateTags(gomock.Any(), gomock.Any()).Return(&ec2.CreateTagsOutput{}, tc.expCreateTagsErr) - mockEC2.EXPECT().DeleteVolume(gomock.Any(), gomock.Any()).Return(&ec2.DeleteVolumeOutput{}, nil).AnyTimes() + mockEC2.EXPECT().DeleteVolume(gomock.Any(), gomock.Any(), gomock.Any()).Return(&ec2.DeleteVolumeOutput{}, nil).AnyTimes() } if len(tc.diskOptions.SnapshotID) > 0 { mockEC2.EXPECT().DescribeSnapshots(gomock.Any(), gomock.Any()).Return(&ec2.DescribeSnapshotsOutput{Snapshots: []types.Snapshot{snapshot}}, nil).AnyTimes() @@ -1313,7 +1313,7 @@ func TestDeleteDisk(t *testing.T) { c := newCloud(mockEC2) ctx := context.Background() - mockEC2.EXPECT().DeleteVolume(gomock.Any(), gomock.Any()).Return(&ec2.DeleteVolumeOutput{}, tc.expErr) + mockEC2.EXPECT().DeleteVolume(gomock.Any(), gomock.Any(), gomock.Any()).Return(&ec2.DeleteVolumeOutput{}, tc.expErr) ok, err := c.DeleteDisk(ctx, tc.volumeID) if err != nil && tc.expErr == nil { @@ -1362,7 +1362,7 @@ func TestAttachDisk(t *testing.T) { gomock.InOrder( mockEC2.EXPECT().DescribeInstances(gomock.Any(), gomock.Eq(instanceRequest)).Return(newDescribeInstancesOutput(nodeID), nil), - mockEC2.EXPECT().AttachVolume(gomock.Any(), gomock.Eq(attachRequest)).Return(&ec2.AttachVolumeOutput{ + mockEC2.EXPECT().AttachVolume(gomock.Any(), gomock.Eq(attachRequest), gomock.Any()).Return(&ec2.AttachVolumeOutput{ Device: aws.String(path), InstanceId: aws.String(nodeID), VolumeId: aws.String(volumeID), @@ -1385,7 +1385,7 @@ func TestAttachDisk(t *testing.T) { gomock.InOrder( mockEC2.EXPECT().DescribeInstances(gomock.Any(), gomock.Eq(instanceRequest)).Return(newDescribeInstancesOutput(nodeID), nil), - mockEC2.EXPECT().AttachVolume(gomock.Any(), gomock.Eq(attachRequest)).Return(&ec2.AttachVolumeOutput{ + mockEC2.EXPECT().AttachVolume(gomock.Any(), gomock.Eq(attachRequest), gomock.Any()).Return(&ec2.AttachVolumeOutput{ Device: aws.String(path), InstanceId: aws.String(nodeID), VolumeId: aws.String(volumeID), @@ -1435,7 +1435,7 @@ func TestAttachDisk(t *testing.T) { gomock.InOrder( mockEC2.EXPECT().DescribeInstances(gomock.Any(), instanceRequest).Return(newDescribeInstancesOutput(nodeID), nil), - mockEC2.EXPECT().AttachVolume(gomock.Any(), attachRequest).Return(nil, errors.New("AttachVolume error")), + mockEC2.EXPECT().AttachVolume(gomock.Any(), attachRequest, gomock.Any()).Return(nil, errors.New("AttachVolume error")), ) }, validateFunc: func(t *testing.T) { @@ -1454,7 +1454,7 @@ func TestAttachDisk(t *testing.T) { gomock.InOrder( mockEC2.EXPECT().DescribeInstances(ctx, instanceRequest).Return(newDescribeInstancesOutput(nodeID), nil), - mockEC2.EXPECT().AttachVolume(ctx, attachRequest).Return(nil, blockDeviceInUseErr), + mockEC2.EXPECT().AttachVolume(ctx, attachRequest, gomock.Any()).Return(nil, blockDeviceInUseErr), ) }, validateFunc: func(t *testing.T) { @@ -1500,7 +1500,7 @@ func TestAttachDisk(t *testing.T) { gomock.InOrder( mockEC2.EXPECT().DescribeInstances(ctx, gomock.Eq(instanceRequest)).Return(newDescribeInstancesOutput(nodeID), nil), - mockEC2.EXPECT().AttachVolume(ctx, gomock.Eq(attachRequest)).Return(&ec2.AttachVolumeOutput{ + mockEC2.EXPECT().AttachVolume(ctx, gomock.Eq(attachRequest), gomock.Any()).Return(&ec2.AttachVolumeOutput{ Device: aws.String(path), InstanceId: aws.String(nodeID), VolumeId: aws.String(volumeID), @@ -1509,7 +1509,7 @@ func TestAttachDisk(t *testing.T) { mockEC2.EXPECT().DescribeVolumes(ctx, gomock.Eq(volumeRequest)).Return(createDescribeVolumesOutput([]*string{&volumeID}, nodeID, path, "attached"), nil), mockEC2.EXPECT().DescribeInstances(ctx, gomock.Eq(createInstanceRequest2)).Return(newDescribeInstancesOutput(nodeID2), nil), - mockEC2.EXPECT().AttachVolume(ctx, gomock.Eq(attachRequest2)).Return(&ec2.AttachVolumeOutput{ + mockEC2.EXPECT().AttachVolume(ctx, gomock.Eq(attachRequest2), gomock.Any()).Return(&ec2.AttachVolumeOutput{ Device: aws.String(path), InstanceId: aws.String(nodeID2), VolumeId: aws.String(volumeID), @@ -1580,7 +1580,7 @@ func TestDetachDisk(t *testing.T) { gomock.InOrder( mockEC2.EXPECT().DescribeInstances(gomock.Any(), instanceRequest).Return(newDescribeInstancesOutput(nodeID), nil), - mockEC2.EXPECT().DetachVolume(gomock.Any(), detachRequest).Return(nil, nil), + mockEC2.EXPECT().DetachVolume(gomock.Any(), detachRequest, gomock.Any()).Return(nil, nil), mockEC2.EXPECT().DescribeVolumes(gomock.Any(), volumeRequest).Return(createDescribeVolumesOutput([]*string{&volumeID}, nodeID, "", "detached"), nil), ) }, @@ -1596,7 +1596,7 @@ func TestDetachDisk(t *testing.T) { gomock.InOrder( mockEC2.EXPECT().DescribeInstances(gomock.Any(), instanceRequest).Return(newDescribeInstancesOutput(nodeID), nil), - mockEC2.EXPECT().DetachVolume(gomock.Any(), detachRequest).Return(nil, errors.New("DetachVolume error")), + mockEC2.EXPECT().DetachVolume(gomock.Any(), detachRequest, gomock.Any()).Return(nil, errors.New("DetachVolume error")), ) }, }, @@ -1611,7 +1611,7 @@ func TestDetachDisk(t *testing.T) { gomock.InOrder( mockEC2.EXPECT().DescribeInstances(gomock.Any(), instanceRequest).Return(newDescribeInstancesOutput(nodeID), nil), - mockEC2.EXPECT().DetachVolume(gomock.Any(), detachRequest).Return(nil, ErrNotFound), + mockEC2.EXPECT().DetachVolume(gomock.Any(), detachRequest, gomock.Any()).Return(nil, ErrNotFound), ) }, }, @@ -1984,7 +1984,7 @@ func TestEnableFastSnapshotRestores(t *testing.T) { c := newCloud(mockEC2) ctx := context.Background() - mockEC2.EXPECT().EnableFastSnapshotRestores(gomock.Any(), gomock.Any()).Return(tc.expOutput, tc.expErr).AnyTimes() + mockEC2.EXPECT().EnableFastSnapshotRestores(gomock.Any(), gomock.Any(), gomock.Any()).Return(tc.expOutput, tc.expErr).AnyTimes() response, err := c.EnableFastSnapshotRestores(ctx, tc.availabilityZones, tc.snapshotID) @@ -2101,7 +2101,7 @@ func TestDeleteSnapshot(t *testing.T) { c := newCloud(mockEC2) ctx := context.Background() - mockEC2.EXPECT().DeleteSnapshot(gomock.Any(), gomock.Any()).Return(&ec2.DeleteSnapshotOutput{}, tc.expErr) + mockEC2.EXPECT().DeleteSnapshot(gomock.Any(), gomock.Any(), gomock.Any()).Return(&ec2.DeleteSnapshotOutput{}, tc.expErr) _, err := c.DeleteSnapshot(ctx, tc.snapshotName) if err != nil { @@ -2425,13 +2425,13 @@ func TestResizeOrModifyDisk(t *testing.T) { } } if tc.modifiedVolume != nil || tc.modifiedVolumeError != nil { - mockEC2.EXPECT().ModifyVolume(gomock.Any(), gomock.Any()).Return(tc.modifiedVolume, tc.modifiedVolumeError).AnyTimes() + mockEC2.EXPECT().ModifyVolume(gomock.Any(), gomock.Any(), gomock.Any()).Return(tc.modifiedVolume, tc.modifiedVolumeError).AnyTimes() } if tc.descModVolume != nil { - mockEC2.EXPECT().DescribeVolumesModifications(gomock.Any(), gomock.Any()).Return(tc.descModVolume, nil).AnyTimes() + mockEC2.EXPECT().DescribeVolumesModifications(gomock.Any(), gomock.Any(), gomock.Any()).Return(tc.descModVolume, nil).AnyTimes() } else { emptyOutput := &ec2.DescribeVolumesModificationsOutput{} - mockEC2.EXPECT().DescribeVolumesModifications(gomock.Any(), gomock.Any()).Return(emptyOutput, nil).AnyTimes() + mockEC2.EXPECT().DescribeVolumesModifications(gomock.Any(), gomock.Any(), gomock.Any()).Return(emptyOutput, nil).AnyTimes() } newSize, err := c.ResizeOrModifyDisk(ctx, tc.volumeID, util.GiBToBytes(tc.reqSizeGiB), tc.modifyDiskOptions) @@ -3077,6 +3077,7 @@ func newCloud(mockEC2 EC2API) Cloud { region: "test-region", dm: dm.NewDeviceManager(), ec2: mockEC2, + rm: newRetryManager(), } return c } diff --git a/pkg/cloud/retry_manager.go b/pkg/cloud/retry_manager.go new file mode 100644 index 0000000000..197a67b023 --- /dev/null +++ b/pkg/cloud/retry_manager.go @@ -0,0 +1,50 @@ +package cloud + +import ( + "github.com/aws/aws-sdk-go-v2/aws/retry" +) + +const ( + // retryMaxAttempt sets max number of EC2 API call attempts. + // Set high enough to ensure default sidecar timeout will cancel context long before we stop retrying. + retryMaxAttempt = 50 +) + +// newAdaptiveRetryer restricts attempts of API calls that recently hit throttle errors. +func newAdaptiveRetryer() *retry.AdaptiveMode { + return retry.NewAdaptiveMode(func(ao *retry.AdaptiveModeOptions) { + ao.StandardOptions = append(ao.StandardOptions, func(so *retry.StandardOptions) { + so.MaxAttempts = retryMaxAttempt + }) + }) +} + +// retryManager dictates the retry strategies of EC2 API calls. +// Each mutating EC2 API has its own retryer because the AWS SDK throttles on a retryer object level, not by API name. +// While default AWS accounts share request tokens between mutating APIs, users can raise limits for individual APIs. +// Separate retryers ensures that throttling one API doesn't unintentionally throttle others with separate token buckets. +type retryManager struct { + createVolumeRetryer *retry.AdaptiveMode + deleteVolumeRetryer *retry.AdaptiveMode + attachVolumeRetryer *retry.AdaptiveMode + detachVolumeRetryer *retry.AdaptiveMode + modifyVolumeRetryer *retry.AdaptiveMode + createSnapshotRetryer *retry.AdaptiveMode + deleteSnapshotRetryer *retry.AdaptiveMode + enableFastSnapshotRestoresRetryer *retry.AdaptiveMode + unbatchableDescribeVolumesModificationsRetryer *retry.AdaptiveMode +} + +func newRetryManager() *retryManager { + return &retryManager{ + createVolumeRetryer: newAdaptiveRetryer(), + attachVolumeRetryer: newAdaptiveRetryer(), + deleteVolumeRetryer: newAdaptiveRetryer(), + detachVolumeRetryer: newAdaptiveRetryer(), + modifyVolumeRetryer: newAdaptiveRetryer(), + createSnapshotRetryer: newAdaptiveRetryer(), + deleteSnapshotRetryer: newAdaptiveRetryer(), + enableFastSnapshotRestoresRetryer: newAdaptiveRetryer(), + unbatchableDescribeVolumesModificationsRetryer: newAdaptiveRetryer(), + } +}