Skip to content

Commit

Permalink
Request coalescing for resizing and modifying volume
Browse files Browse the repository at this point in the history
Signed-off-by: Hanyue Liang <[email protected]>
  • Loading branch information
hanyuel committed Jul 13, 2023
1 parent 6f2db76 commit 427b0ce
Show file tree
Hide file tree
Showing 5 changed files with 284 additions and 12 deletions.
1 change: 0 additions & 1 deletion docs/modify-volume.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ Users can specify the following PVC annotations:
## Considerations

- Keep in mind the [6 hour cooldown period](https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_ModifyVolume.html) for EBS ModifyVolume. Multiple ModifyVolume calls for the same volume within a 6 hour period will fail.
- It is not yet possible to update both the annotations and capacity of the PVC at the same time. This results in multiple RPC calls to the driver, and only one of them will succeed (due to the cooldown period). A future release of the driver will lift this restriction.
- Ensure that the desired volume properties are permissible. The driver does minimum client side validation.

## Example
Expand Down
98 changes: 98 additions & 0 deletions pkg/cloud/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,104 @@ func (c *cloud) ModifyDisk(ctx context.Context, volumeID string, options *Modify
return c.waitForVolumeSize(ctx, volumeID)
}

func (c *cloud) ResizeOrModifyDisk(ctx context.Context, volumeID string, newSizeBytes int64, options *ModifyDiskOptions) (int64, error) {
if newSizeBytes != 0 {
klog.V(4).InfoS("Received Resize and/or Modify Disk request", "volumeID", volumeID, "newSizeBytes", newSizeBytes, "options", options)
} else {
klog.V(4).InfoS("Received Modify Disk request", "volumeID", volumeID, "options", options)
}

newSizeGiB := util.RoundUpGiB(newSizeBytes)
var oldSizeGiB int64
if newSizeBytes != 0 {
request := &ec2.DescribeVolumesInput{
VolumeIds: []*string{
aws.String(volumeID),
},
}
volume, err := c.getVolume(ctx, request)
if err != nil {
return 0, err
}

// AWS resizes in chunks of GiB (not GB)
oldSizeGiB = aws.Int64Value(volume.Size)

latestMod, modFetchError := c.getLatestVolumeModification(ctx, volumeID)

if latestMod != nil && modFetchError == nil {
state := aws.StringValue(latestMod.ModificationState)
if state == ec2.VolumeModificationStateModifying {
err = c.waitForVolumeSize(ctx, volumeID)
if err != nil {
return oldSizeGiB, err
}
return c.checkDesiredSize(ctx, volumeID, newSizeGiB)
}
}

// if there was an error fetching volume modifications and it was anything other than VolumeNotBeingModified error
// that means we have an API problem.
if modFetchError != nil && !errors.Is(modFetchError, VolumeNotBeingModified) {
return oldSizeGiB, fmt.Errorf("error fetching volume modifications for %q: %w", volumeID, modFetchError)
}

// Even if existing volume size is greater than user requested size, we should ensure that there are no pending
// volume modifications objects or volume has completed previously issued modification request.
if oldSizeGiB >= newSizeGiB {
klog.V(5).InfoS("[Debug] Volume", "volumeID", volumeID, "oldSizeGiB", oldSizeGiB, "newSizeGiB", newSizeGiB)
err = c.waitForVolumeSize(ctx, volumeID)
if err != nil && !errors.Is(err, VolumeNotBeingModified) {
return oldSizeGiB, err
}
return oldSizeGiB, nil
}
}

req := &ec2.ModifyVolumeInput{
VolumeId: aws.String(volumeID),
}
// Only set req.size for resizing volume
if newSizeBytes != 0 {
req.Size = aws.Int64(newSizeGiB)
}
if options.IOPS != 0 {
req.Iops = aws.Int64(int64(options.IOPS))
}
if options.VolumeType != "" {
req.VolumeType = aws.String(options.VolumeType)
}
if options.VolumeType == VolumeTypeGP3 {
req.Throughput = aws.Int64(int64(options.Throughput))
}

response, err := c.ec2.ModifyVolumeWithContext(ctx, req)
if err != nil {
return 0, fmt.Errorf("unable to modify AWS volume %q: %w", volumeID, err)
}

mod := response.VolumeModification
state := aws.StringValue(mod.ModificationState)

if volumeModificationDone(state) {
if newSizeBytes != 0 {
return c.checkDesiredSize(ctx, volumeID, newSizeGiB)
} else {
return 0, nil
}
}

err = c.waitForVolumeSize(ctx, volumeID)
if newSizeBytes != 0 {
if err != nil {
return oldSizeGiB, err
}
return c.checkDesiredSize(ctx, volumeID, newSizeGiB)
} else {
return 0, c.waitForVolumeSize(ctx, volumeID)
}
}

func (c *cloud) DeleteDisk(ctx context.Context, volumeID string) (bool, error) {
request := &ec2.DeleteVolumeInput{VolumeId: &volumeID}
if _, err := c.ec2.DeleteVolumeWithContext(ctx, request); err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/cloud/cloud_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type Cloud interface {
AttachDisk(ctx context.Context, volumeID string, nodeID string) (devicePath string, err error)
DetachDisk(ctx context.Context, volumeID string, nodeID string) (err error)
ResizeDisk(ctx context.Context, volumeID string, reqSize int64) (newSize int64, err error)
ResizeOrModifyDisk(ctx context.Context, volumeID string, newSizeBytes int64, options *ModifyDiskOptions) (newSize int64, err error)
WaitForAttachmentState(ctx context.Context, volumeID, expectedState string, expectedInstance string, expectedDevice string, alreadyAssigned bool) (*ec2.VolumeAttachment, error)
GetDiskByName(ctx context.Context, name string, capacityBytes int64) (disk *Disk, err error)
GetDiskByID(ctx context.Context, volumeID string) (disk *Disk, err error)
Expand Down
32 changes: 25 additions & 7 deletions pkg/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ const isManagedByDriver = "true"

// controllerService represents the controller service of CSI driver
type controllerService struct {
cloud cloud.Cloud
inFlight *internal.InFlight
driverOptions *DriverOptions
cloud cloud.Cloud
inFlight *internal.InFlight
driverOptions *DriverOptions
modifyVolumeManager *modifyVolumeManager

rpc.UnimplementedModifyServer
}
Expand Down Expand Up @@ -97,9 +98,10 @@ func newControllerService(driverOptions *DriverOptions) controllerService {
}

return controllerService{
cloud: cloudSrv,
inFlight: internal.NewInFlight(),
driverOptions: driverOptions,
cloud: cloudSrv,
inFlight: internal.NewInFlight(),
driverOptions: driverOptions,
modifyVolumeManager: newModifyVolumeManager(),
}
}

Expand Down Expand Up @@ -508,11 +510,27 @@ func (d *controllerService) ControllerExpandVolume(ctx context.Context, req *csi
return nil, status.Error(codes.InvalidArgument, "After round-up, volume size exceeds the limit specified")
}

actualSizeGiB, err := d.cloud.ResizeDisk(ctx, volumeID, newSize)
modifyVolumeRequest := modifyVolumeRequest{
newSize: newSize,
}

err := d.addModifyVolumeRequest(ctx, volumeID, &modifyVolumeRequest)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not resize volume %q: %v", volumeID, err)
}

var actualSizeGiB int64

select {
case err := <-d.modifyVolumeManager.requestHandlerMap[volumeID].errChan:
d.modifyVolumeManager.deleteRequestHandler(volumeID)
return nil, status.Errorf(codes.Internal, "Could not resize volume %q: %v", volumeID, err)
case size := <-d.modifyVolumeManager.requestHandlerMap[volumeID].resultChan:
actualSizeGiB = size
}

d.modifyVolumeManager.deleteRequestHandler(volumeID)

nodeExpansionRequired := true
// if this is a raw block device, no expansion should be necessary on the node
cap := req.GetVolumeCapability()
Expand Down
164 changes: 160 additions & 4 deletions pkg/driver/controller_modify_volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package driver

import (
"context"
"fmt"
"strconv"
"sync"
"time"

"github.com/awslabs/volume-modifier-for-k8s/pkg/rpc"
"github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud"
Expand All @@ -17,8 +20,147 @@ const (
ModificationKeyIOPS = "iops"

ModificationKeyThroughput = "throughput"

modifyVolumeRequestHandlerTimeout = 2 * time.Second
)

type modifyVolumeRequest struct {
newSize int64
modifyDiskOptions cloud.ModifyDiskOptions
}

type modifyVolumeRequestHandler struct {
volumeID string
// Merged request from the requests that have been accepted for the volume
mergedRequest *modifyVolumeRequest
// Channel for sending requests to the Timer thread for the volume
requestChan chan *modifyVolumeRequest
// Channel for sending errors of ec2 ModifyVolume API call to the CSI Driver main thread
errChan chan error
// Channel for sending the actual volume size after the ec2 ModifyVolume API call to the CSI Driver main thread
resultChan chan int64
mux sync.RWMutex
}

type modifyVolumeManager struct {
// Map of volume ID to modifyVolumeRequestHandler
requestHandlerMap map[string]*modifyVolumeRequestHandler
mux *sync.RWMutex
}

func newModifyVolumeManager() *modifyVolumeManager {
return &modifyVolumeManager{
requestHandlerMap: make(map[string]*modifyVolumeRequestHandler),
mux: &sync.RWMutex{},
}
}

func (m *modifyVolumeManager) deleteRequestHandler(volumeID string) {
m.mux.Lock()
defer m.mux.Unlock()
delete(m.requestHandlerMap, volumeID)
}

// This function validates the new request against the merged request for the volume.
// If the new request has a volume property that's already included in the merged request and its value is different from that in the merged request,
// this function will return an error and the new request will be rejected.
func (h *modifyVolumeRequestHandler) validateModifyVolumeRequest(r *modifyVolumeRequest) error {
h.mux.RLock()
defer h.mux.RUnlock()
if r.newSize != 0 && h.mergedRequest.newSize != 0 && r.newSize != h.mergedRequest.newSize {
return fmt.Errorf("Different size was requested by a previous request")
}
if r.modifyDiskOptions.IOPS != 0 && h.mergedRequest.modifyDiskOptions.IOPS != 0 && r.modifyDiskOptions.IOPS != h.mergedRequest.modifyDiskOptions.IOPS {
return fmt.Errorf("Different IOPS was requested by a previous request")
}
if r.modifyDiskOptions.Throughput != 0 && h.mergedRequest.modifyDiskOptions.Throughput != 0 && r.modifyDiskOptions.Throughput != h.mergedRequest.modifyDiskOptions.Throughput {
return fmt.Errorf("Different throughput was requested by a previous request")
}
if r.modifyDiskOptions.VolumeType != "" && h.mergedRequest.modifyDiskOptions.VolumeType != "" && r.modifyDiskOptions.VolumeType != h.mergedRequest.modifyDiskOptions.VolumeType {
return fmt.Errorf("Different volume type was requested by a previous request")
}
return nil
}

func (h *modifyVolumeRequestHandler) mergeModifyVolumeRequest(r *modifyVolumeRequest) {
h.mux.Lock()
defer h.mux.Unlock()
if r.newSize != 0 {
h.mergedRequest.newSize = r.newSize
}
if r.modifyDiskOptions.IOPS != 0 {
h.mergedRequest.modifyDiskOptions.IOPS = r.modifyDiskOptions.IOPS
}
if r.modifyDiskOptions.Throughput != 0 {
h.mergedRequest.modifyDiskOptions.Throughput = r.modifyDiskOptions.Throughput
}
if r.modifyDiskOptions.VolumeType != "" {
h.mergedRequest.modifyDiskOptions.VolumeType = r.modifyDiskOptions.VolumeType
}
}

func (d *controllerService) addModifyVolumeRequest(ctx context.Context, volumeID string, r *modifyVolumeRequest) error {
d.modifyVolumeManager.mux.RLock()
requestHandler, ok := d.modifyVolumeManager.requestHandlerMap[volumeID]
d.modifyVolumeManager.mux.RUnlock()
if ok {
err := requestHandler.validateModifyVolumeRequest(r)
if err == nil {
requestHandler.requestChan <- r
} else {
return err
}
} else {
d.modifyVolumeManager.mux.Lock()
defer d.modifyVolumeManager.mux.Unlock()

requestChan := make(chan *modifyVolumeRequest)
errChan := make(chan error)
resultChan := make(chan int64)
requestHandler := modifyVolumeRequestHandler{
requestChan: requestChan,
errChan: errChan,
resultChan: resultChan,
mergedRequest: r,
volumeID: volumeID,
mux: sync.RWMutex{},
}
d.modifyVolumeManager.requestHandlerMap[volumeID] = &requestHandler

go d.startVolumeTimer(ctx, &requestHandler)
}
return nil
}

func (d *controllerService) startVolumeTimer(ctx context.Context, h *modifyVolumeRequestHandler) {
klog.V(4).InfoS("Starting ModifyVolume timer for ", "volumeID", h.volumeID)
for {
select {
case req := <-h.requestChan:
h.mergeModifyVolumeRequest(req)
case <-time.After(modifyVolumeRequestHandlerTimeout):
actualSizeGiB, err := d.executeModifyVolumeRequest(ctx, h.volumeID, h.mergedRequest)
if err != nil {
h.errChan <- err
} else {
h.resultChan <- actualSizeGiB
}
return
case <-ctx.Done():
return
}
}
}

func (d *controllerService) executeModifyVolumeRequest(ctx context.Context, volumeID string, req *modifyVolumeRequest) (int64, error) {
actualSizeGiB, err := d.cloud.ResizeOrModifyDisk(ctx, volumeID, req.newSize, &req.modifyDiskOptions)
if err != nil {
return 0, status.Errorf(codes.Internal, "Could not modify volume %q: %v", volumeID, err)
} else {
return actualSizeGiB, nil
}
}

func (d *controllerService) GetCSIDriverModificationCapability(
_ context.Context,
_ *rpc.GetCSIDriverModificationCapabilityRequest,
Expand All @@ -30,8 +172,8 @@ func (d *controllerService) ModifyVolumeProperties(
ctx context.Context,
req *rpc.ModifyVolumePropertiesRequest,
) (*rpc.ModifyVolumePropertiesResponse, error) {
klog.V(4).InfoS("ModifyVolumeAttributes called", "req", req)
if err := validateModifyVolumeAttributesRequest(req); err != nil {
klog.V(4).InfoS("ModifyVolumeProperties called", "req", req)
if err := validateModifyVolumePropertiesRequest(req); err != nil {
return nil, err
}

Expand All @@ -55,13 +197,27 @@ func (d *controllerService) ModifyVolumeProperties(
modifyOptions.VolumeType = value
}
}
if err := d.cloud.ModifyDisk(ctx, name, &modifyOptions); err != nil {
modifyVolumeRequest := modifyVolumeRequest{
modifyDiskOptions: modifyOptions,
}
if err := d.addModifyVolumeRequest(ctx, name, &modifyVolumeRequest); err != nil {
return nil, status.Errorf(codes.Internal, "Could not modify volume %q: %v", name, err)
}

select {
case err := <-d.modifyVolumeManager.requestHandlerMap[name].errChan:
d.modifyVolumeManager.deleteRequestHandler(name)
return nil, status.Errorf(codes.Internal, "Could not modify volume %q: %v", name, err)
case <-d.modifyVolumeManager.requestHandlerMap[name].resultChan:
break
}

d.modifyVolumeManager.deleteRequestHandler(name)

return &rpc.ModifyVolumePropertiesResponse{}, nil
}

func validateModifyVolumeAttributesRequest(req *rpc.ModifyVolumePropertiesRequest) error {
func validateModifyVolumePropertiesRequest(req *rpc.ModifyVolumePropertiesRequest) error {
name := req.GetName()
if name == "" {
return status.Error(codes.InvalidArgument, "Volume name not provided")
Expand Down

0 comments on commit 427b0ce

Please sign in to comment.