Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch polling of DescribeVolumesModifications #1965

Merged
merged 2 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 109 additions & 23 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ var (

// ErrInvalidArgument is returned if parameters were rejected by cloud provider
ErrInvalidArgument = errors.New("invalid argument")

// ErrInvalidRequest is returned if parameters were rejected by driver
ErrInvalidRequest = errors.New("invalid request")
)

// Set during build time via -ldflags
Expand Down Expand Up @@ -265,11 +268,12 @@ type snapshotBatcherType int

// batcherManager maintains a collection of batchers for different types of tasks.
type batcherManager struct {
volumeIDBatcher *batcher.Batcher[string, *types.Volume]
volumeTagBatcher *batcher.Batcher[string, *types.Volume]
instanceIDBatcher *batcher.Batcher[string, *types.Instance]
snapshotIDBatcher *batcher.Batcher[string, *types.Snapshot]
snapshotTagBatcher *batcher.Batcher[string, *types.Snapshot]
volumeIDBatcher *batcher.Batcher[string, *types.Volume]
volumeTagBatcher *batcher.Batcher[string, *types.Volume]
instanceIDBatcher *batcher.Batcher[string, *types.Instance]
snapshotIDBatcher *batcher.Batcher[string, *types.Snapshot]
snapshotTagBatcher *batcher.Batcher[string, *types.Snapshot]
volumeModificationIDBatcher *batcher.Batcher[string, *types.VolumeModification]
}

type cloud struct {
Expand Down Expand Up @@ -358,6 +362,9 @@ func newBatcherManager(svc EC2API) *batcherManager {
snapshotTagBatcher: batcher.New(1000, 300*time.Millisecond, func(names []string) (map[string]*types.Snapshot, error) {
return execBatchDescribeSnapshots(svc, names, snapshotTagBatcher)
}),
volumeModificationIDBatcher: batcher.New(500, 300*time.Millisecond, func(names []string) (map[string]*types.VolumeModification, error) {
return execBatchDescribeVolumesModifications(svc, names)
}),
}
}

Expand Down Expand Up @@ -428,7 +435,7 @@ func (c *cloud) batchDescribeVolumes(request *ec2.DescribeVolumesInput) (*types.
task = request.Filters[0].Values[0]

default:
return nil, fmt.Errorf("batchDescribeVolumes: invalid request, request: %v", request)
return nil, fmt.Errorf("%w: batchDescribeVolumes: request: %v", ErrInvalidRequest, request)
}

ch := make(chan batcher.BatchResult[*types.Volume])
Expand Down Expand Up @@ -638,7 +645,56 @@ func (c *cloud) CreateDisk(ctx context.Context, volumeName string, diskOptions *
return &Disk{CapacityGiB: size, VolumeID: volumeID, AvailabilityZone: zone, SnapshotID: snapshotID, OutpostArn: outpostArn}, nil
}

// ResizeOrModifyDisk resizes an EBS volume in GiB increments, rouding up to the next possible allocatable unit, and/or modifies an EBS
// execBatchDescribeVolumesModifications executes a batched DescribeVolumesModifications API call
func execBatchDescribeVolumesModifications(svc EC2API, input []string) (map[string]*types.VolumeModification, error) {
klog.V(7).InfoS("execBatchDescribeVolumeModifications", "volumeIds", input)
request := &ec2.DescribeVolumesModificationsInput{
VolumeIds: input,
}

ctx, cancel := context.WithTimeout(context.Background(), batchDescribeTimeout)
defer cancel()

resp, err := describeVolumesModifications(ctx, svc, request)
if err != nil {
return nil, err
}

result := make(map[string]*types.VolumeModification)

for _, m := range resp {
volumeModification := m
result[*volumeModification.VolumeId] = &volumeModification
}

klog.V(7).InfoS("execBatchDescribeVolumeModifications: success", "result", result)
return result, nil
}

// batchDescribeVolumesModifications processes a DescribeVolumesModifications request by queuing the task and waiting for the result.
func (c *cloud) batchDescribeVolumesModifications(request *ec2.DescribeVolumesModificationsInput) (*types.VolumeModification, error) {
var task string

if len(request.VolumeIds) == 1 && request.VolumeIds[0] != "" {
task = request.VolumeIds[0]
} else {
return nil, fmt.Errorf("%w: batchDescribeVolumesModifications: invalid request, request: %v", ErrInvalidRequest, request)
}

ch := make(chan batcher.BatchResult[*types.VolumeModification])

b := c.bm.volumeModificationIDBatcher
b.AddTask(task, ch)

r := <-ch

if r.Err != nil {
return nil, r.Err
}
return r.Result, nil
}

// ResizeOrModifyDisk resizes an EBS volume in GiB increments, rounding up to the next possible allocatable unit, and/or modifies an EBS
// volume with the parameters in ModifyDiskOptions.
// The resizing operation is performed only when newSizeBytes != 0.
// It returns the volume size after this call or an error if the size couldn't be determined or the volume couldn't be modified.
Expand Down Expand Up @@ -705,7 +761,7 @@ func (c *cloud) DeleteDisk(ctx context.Context, volumeID string) (bool, error) {
return true, nil
}

// executes a batched DescribeInstances API call
// execBatchDescribeInstances executes a batched DescribeInstances API call
func execBatchDescribeInstances(svc EC2API, input []string) (map[string]*types.Instance, error) {
klog.V(7).InfoS("execBatchDescribeInstances", "instanceIds", input)
request := &ec2.DescribeInstancesInput{
Expand Down Expand Up @@ -742,7 +798,7 @@ func (c *cloud) batchDescribeInstances(request *ec2.DescribeInstancesInput) (*ty
if len(request.InstanceIds) == 1 && request.InstanceIds[0] != "" {
task = request.InstanceIds[0]
} else {
return nil, fmt.Errorf("batchDescribeInstances: invalid request, request: %v", request)
return nil, fmt.Errorf("%w: batchDescribeInstances: request: %v", ErrInvalidRequest, request)
}

ch := make(chan batcher.BatchResult[*types.Instance])
Expand Down Expand Up @@ -1108,7 +1164,7 @@ func (c *cloud) batchDescribeSnapshots(request *ec2.DescribeSnapshotsInput) (*ty
task = request.Filters[0].Values[0]

default:
return nil, fmt.Errorf("batchDescribeSnapshots: invalid request, request: %v", request)
return nil, fmt.Errorf("%w: batchDescribeSnapshots: request: %v", ErrInvalidRequest, request)
}

ch := make(chan batcher.BatchResult[*types.Snapshot])
Expand Down Expand Up @@ -1598,7 +1654,7 @@ func (c *cloud) waitForVolumeModification(ctx context.Context, volumeID string)
}

waitErr := wait.ExponentialBackoff(backoff, func() (bool, error) {
m, err := c.getLatestVolumeModification(ctx, volumeID)
m, err := c.getLatestVolumeModification(ctx, volumeID, true)
// Consider volumes that have never been modified as done
if err != nil && errors.Is(err, VolumeNotBeingModified) {
return true, nil
Expand All @@ -1621,25 +1677,53 @@ func (c *cloud) waitForVolumeModification(ctx context.Context, volumeID string)
return nil
}

func describeVolumesModifications(ctx context.Context, svc EC2API, request *ec2.DescribeVolumesModificationsInput) ([]types.VolumeModification, error) {
volumeModifications := []types.VolumeModification{}
var nextToken *string
for {
response, err := svc.DescribeVolumesModifications(ctx, request)
if err != nil {
if isAWSErrorModificationNotFound(err) {
ConnorJC3 marked this conversation as resolved.
Show resolved Hide resolved
return nil, VolumeNotBeingModified
}
return nil, fmt.Errorf("error describing volume modifications: %w", err)
}

volumeModifications = append(volumeModifications, response.VolumesModifications...)

nextToken = response.NextToken
if aws.ToString(nextToken) == "" {
break
}
request.NextToken = nextToken
}
return volumeModifications, nil
}

// getLatestVolumeModification returns the last modification of the volume.
func (c *cloud) getLatestVolumeModification(ctx context.Context, volumeID string) (*types.VolumeModification, error) {
func (c *cloud) getLatestVolumeModification(ctx context.Context, volumeID string, isBatchable bool) (*types.VolumeModification, error) {
request := &ec2.DescribeVolumesModificationsInput{
VolumeIds: []string{volumeID},
}
mod, err := c.ec2.DescribeVolumesModifications(ctx, request)
if err != nil {
if isAWSErrorModificationNotFound(err) {

if c.bm == nil || !isBatchable {
mod, err := c.ec2.DescribeVolumesModifications(ctx, request)
if err != nil {
if isAWSErrorModificationNotFound(err) {
return nil, VolumeNotBeingModified
}
return nil, fmt.Errorf("error describing modifications in volume %q: %w", volumeID, err)
}

volumeMods := mod.VolumesModifications
if len(volumeMods) == 0 {
return nil, VolumeNotBeingModified
}
return nil, fmt.Errorf("error describing modifications in volume %q: %w", volumeID, err)
}

volumeMods := mod.VolumesModifications
if len(volumeMods) == 0 {
return nil, VolumeNotBeingModified
return &volumeMods[len(volumeMods)-1], nil
} else {
return c.batchDescribeVolumesModifications(request)
}

return &volumeMods[len(volumeMods)-1], nil
}

// randomAvailabilityZone returns a random zone from the given region
Expand Down Expand Up @@ -1710,7 +1794,8 @@ func (c *cloud) validateModifyVolume(ctx context.Context, volumeID string, newSi
}
oldSizeGiB := *volume.Size

latestMod, err := c.getLatestVolumeModification(ctx, volumeID)
// This call must NOT be batched because a missing volume modification will return client error
latestMod, err := c.getLatestVolumeModification(ctx, volumeID, false)
if err != nil && !errors.Is(err, VolumeNotBeingModified) {
return true, oldSizeGiB, fmt.Errorf("error fetching volume modifications for %q: %w", volumeID, err)
}
Expand All @@ -1733,6 +1818,7 @@ func (c *cloud) validateModifyVolume(ctx context.Context, volumeID string, newSi

// At this point, we know we are starting a new volume modification
// If we're asked to modify a volume to its current state, ignore the request and immediately return a success
// This is because as of March 2024, EC2 ModifyVolume calls that don't change any parameters still modify the volume
if !needsVolumeModification(*volume, newSizeGiB, options) {
klog.V(5).InfoS("[Debug] Skipping modification for volume due to matching stats", "volumeID", volumeID)
// Wait for any existing modifications to prevent race conditions where DescribeVolume(s) returns the new
Expand Down
Loading
Loading