diff --git a/pkg/driver/controller.go b/pkg/driver/controller.go index f0aff469b4..4a651779fe 100644 --- a/pkg/driver/controller.go +++ b/pkg/driver/controller.go @@ -27,6 +27,7 @@ import ( csi "github.com/container-storage-interface/spec/lib/go/csi" "github.com/golang/protobuf/ptypes" "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud" + "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/driver/internal" "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/util" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -56,6 +57,7 @@ var ( // controllerService represents the controller service of CSI driver type controllerService struct { cloud cloud.Cloud + inFlight *internal.InFlight driverOptions *DriverOptions } @@ -87,6 +89,7 @@ func newControllerService(driverOptions *DriverOptions) controllerService { return controllerService{ cloud: cloud, + inFlight: internal.NewInFlight(), driverOptions: driverOptions, } } @@ -201,6 +204,13 @@ func (d *controllerService) CreateVolume(ctx context.Context, req *csi.CreateVol return newCreateVolumeResponse(disk), nil } + // check if a request is already in-flight because the CreateVolume API is not idempotent + if ok := d.inFlight.Insert(req); !ok { + msg := fmt.Sprintf("Create volume request for %s is already in progress", volName) + return nil, status.Error(codes.AlreadyExists, msg) + } + defer d.inFlight.Delete(req) + // create a new volume zone := pickAvailabilityZone(req.GetAccessibilityRequirements()) outpostArn := getOutpostArn(req.GetAccessibilityRequirements()) diff --git a/pkg/driver/controller_test.go b/pkg/driver/controller_test.go index aa16c7cdcd..78351b4214 100644 --- a/pkg/driver/controller_test.go +++ b/pkg/driver/controller_test.go @@ -31,6 +31,7 @@ import ( "github.com/container-storage-interface/spec/lib/go/csi" "github.com/golang/mock/gomock" "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/cloud" + "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/driver/internal" "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/driver/mocks" "github.com/kubernetes-sigs/aws-ebs-csi-driver/pkg/util" "google.golang.org/grpc/codes" @@ -198,6 +199,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -268,6 +270,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -335,6 +338,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -390,6 +394,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -445,6 +450,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -472,6 +478,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -526,6 +533,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -612,6 +620,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -675,6 +684,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -736,6 +746,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -792,6 +803,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -834,6 +846,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -876,6 +889,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -917,6 +931,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -958,6 +973,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -999,6 +1015,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -1041,6 +1058,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -1076,6 +1094,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -1116,6 +1135,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -1156,6 +1176,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -1230,6 +1251,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -1309,7 +1331,8 @@ func TestCreateVolume(t *testing.T) { mockCloud.EXPECT().CreateDisk(gomock.Eq(ctx), gomock.Eq(req.Name), gomock.Eq(diskOptions)).Return(mockDisk, nil) awsDriver := controllerService{ - cloud: mockCloud, + cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{ extraTags: map[string]string{ extraVolumeTagKey: extraVolumeTagValue, @@ -1370,7 +1393,8 @@ func TestCreateVolume(t *testing.T) { mockCloud.EXPECT().CreateDisk(gomock.Eq(ctx), gomock.Eq(req.Name), gomock.Eq(diskOptions)).Return(mockDisk, nil) awsDriver := controllerService{ - cloud: mockCloud, + cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{ kubernetesClusterID: clusterID, }, @@ -1437,6 +1461,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -1472,6 +1497,7 @@ func TestCreateVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -1489,6 +1515,48 @@ func TestCreateVolume(t *testing.T) { } }, }, + { + name: "fail with in-flight request", + testFunc: func(t *testing.T) { + req := &csi.CreateVolumeRequest{ + Name: "random-vol-name", + CapacityRange: stdCapRange, + VolumeCapabilities: stdVolCap, + Parameters: nil, + } + + ctx := context.Background() + + mockCtl := gomock.NewController(t) + defer mockCtl.Finish() + + mockCloud := mocks.NewMockCloud(mockCtl) + mockCloud.EXPECT().GetDiskByName(gomock.Eq(ctx), gomock.Eq(req.Name), gomock.Eq(stdVolSize)).Return(nil, cloud.ErrNotFound) + + inFlight := internal.NewInFlight() + inFlight.Insert(req) + defer inFlight.Delete(req) + + awsDriver := controllerService{ + cloud: mockCloud, + inFlight: inFlight, + driverOptions: &DriverOptions{}, + } + + _, err := awsDriver.CreateVolume(ctx, req) + if err == nil { + t.Fatalf("Expected CreateVolume to fail but got no error") + } + + srvErr, ok := status.FromError(err) + if !ok { + t.Fatalf("Could not get error status code from error: %v", srvErr) + } + if srvErr.Code() != codes.AlreadyExists { + t.Fatalf("Expected AlreadyExists but got: %s", srvErr.Code()) + } + }, + }, } for _, tc := range testCases { @@ -1517,6 +1585,7 @@ func TestDeleteVolume(t *testing.T) { mockCloud.EXPECT().DeleteDisk(gomock.Eq(ctx), gomock.Eq(req.VolumeId)).Return(true, nil) awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } resp, err := awsDriver.DeleteVolume(ctx, req) @@ -1548,6 +1617,7 @@ func TestDeleteVolume(t *testing.T) { mockCloud.EXPECT().DeleteDisk(gomock.Eq(ctx), gomock.Eq(req.VolumeId)).Return(false, cloud.ErrNotFound) awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } resp, err := awsDriver.DeleteVolume(ctx, req) @@ -1578,6 +1648,7 @@ func TestDeleteVolume(t *testing.T) { mockCloud.EXPECT().DeleteDisk(gomock.Eq(ctx), gomock.Eq(req.VolumeId)).Return(false, fmt.Errorf("DeleteDisk could not delete volume")) awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } resp, err := awsDriver.DeleteVolume(ctx, req) @@ -1837,6 +1908,7 @@ func TestCreateSnapshot(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } resp, err := awsDriver.CreateSnapshot(context.Background(), req) @@ -1891,7 +1963,8 @@ func TestCreateSnapshot(t *testing.T) { mockCloud.EXPECT().GetSnapshotByName(gomock.Eq(ctx), gomock.Eq(req.GetName())).Return(nil, cloud.ErrNotFound) awsDriver := controllerService{ - cloud: mockCloud, + cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{ kubernetesClusterID: clusterID, }, @@ -1944,7 +2017,8 @@ func TestCreateSnapshot(t *testing.T) { mockCloud.EXPECT().GetSnapshotByName(gomock.Eq(ctx), gomock.Eq(req.GetName())).Return(nil, cloud.ErrNotFound) awsDriver := controllerService{ - cloud: mockCloud, + cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{ extraTags: map[string]string{ extraVolumeTagKey: extraVolumeTagValue, @@ -1976,6 +2050,7 @@ func TestCreateSnapshot(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } if _, err := awsDriver.CreateSnapshot(context.Background(), req); err != nil { @@ -2024,6 +2099,7 @@ func TestCreateSnapshot(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } resp, err := awsDriver.CreateSnapshot(context.Background(), req) @@ -2090,6 +2166,7 @@ func TestCreateSnapshot(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } resp, err := awsDriver.CreateSnapshot(context.Background(), req) @@ -2131,6 +2208,7 @@ func TestDeleteSnapshot(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -2155,6 +2233,7 @@ func TestDeleteSnapshot(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -2211,6 +2290,7 @@ func TestListSnapshots(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -2237,6 +2317,7 @@ func TestListSnapshots(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -2272,6 +2353,7 @@ func TestListSnapshots(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -2301,6 +2383,7 @@ func TestListSnapshots(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -2330,6 +2413,7 @@ func TestListSnapshots(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -2361,6 +2445,7 @@ func TestListSnapshots(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -2423,6 +2508,7 @@ func TestControllerPublishVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -2453,7 +2539,11 @@ func TestControllerPublishVolume(t *testing.T) { mockCloud := mocks.NewMockCloud(mockCtl) mockCloud.EXPECT().DetachDisk(gomock.Eq(ctx), req.VolumeId, req.NodeId).Return(cloud.ErrNotFound) - awsDriver := controllerService{cloud: mockCloud} + awsDriver := controllerService{ + cloud: mockCloud, + inFlight: internal.NewInFlight(), + driverOptions: &DriverOptions{}, + } resp, err := awsDriver.ControllerUnpublishVolume(ctx, req) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -2478,6 +2568,7 @@ func TestControllerPublishVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -2510,6 +2601,7 @@ func TestControllerPublishVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -2543,6 +2635,7 @@ func TestControllerPublishVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -2581,6 +2674,7 @@ func TestControllerPublishVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -2616,6 +2710,7 @@ func TestControllerPublishVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -2652,6 +2747,7 @@ func TestControllerPublishVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -2693,6 +2789,7 @@ func TestControllerPublishVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -2743,6 +2840,7 @@ func TestControllerUnpublishVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -2770,6 +2868,7 @@ func TestControllerUnpublishVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -2802,6 +2901,7 @@ func TestControllerUnpublishVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } @@ -2881,6 +2981,7 @@ func TestControllerExpandVolume(t *testing.T) { awsDriver := controllerService{ cloud: mockCloud, + inFlight: internal.NewInFlight(), driverOptions: &DriverOptions{}, } diff --git a/pkg/driver/node.go b/pkg/driver/node.go index 96f68f3cec..0a78d8914b 100644 --- a/pkg/driver/node.go +++ b/pkg/driver/node.go @@ -143,12 +143,11 @@ func (d *nodeService) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol if ok := d.inFlight.Insert(req); !ok { msg := fmt.Sprintf("request to stage volume=%q is already in progress", volumeID) - return nil, status.Error(codes.Internal, msg) + return nil, status.Error(codes.AlreadyExists, msg) } defer func() { klog.V(4).Infof("NodeStageVolume: volume=%q operation finished", req.GetVolumeId()) d.inFlight.Delete(req) - klog.V(4).Info("donedone") }() devicePath, ok := req.PublishContext[DevicePathKey] diff --git a/pkg/driver/node_test.go b/pkg/driver/node_test.go index 3a2572db19..d914d4aec5 100644 --- a/pkg/driver/node_test.go +++ b/pkg/driver/node_test.go @@ -477,6 +477,40 @@ func TestNodeStageVolume(t *testing.T) { } }, }, + { + name: "fail with in-flight request", + testFunc: func(t *testing.T) { + mockCtl := gomock.NewController(t) + defer mockCtl.Finish() + + mockMetadata := mocks.NewMockMetadataService(mockCtl) + mockMounter := mocks.NewMockMounter(mockCtl) + + req := &csi.NodeStageVolumeRequest{ + PublishContext: map[string]string{DevicePathKey: devicePath}, + StagingTargetPath: targetPath, + VolumeCapability: stdVolCap, + VolumeId: "vol-test", + } + + inFlight := internal.NewInFlight() + inFlight.Insert(req) + defer inFlight.Delete(req) + + awsDriver := &nodeService{ + metadata: mockMetadata, + mounter: mockMounter, + inFlight: inFlight, + } + + _, err := awsDriver.NodeStageVolume(context.TODO(), req) + if err == nil { + t.Fatalf("Expect error but got no error") + } + + expectErr(t, err, codes.AlreadyExists) + }, + }, } for _, tc := range testCases { diff --git a/pkg/driver/sanity_test.go b/pkg/driver/sanity_test.go index e2bf9504ad..203608d093 100644 --- a/pkg/driver/sanity_test.go +++ b/pkg/driver/sanity_test.go @@ -47,6 +47,7 @@ func TestSanity(t *testing.T) { options: driverOptions, controllerService: controllerService{ cloud: newFakeCloudProvider(), + inFlight: internal.NewInFlight(), driverOptions: driverOptions, }, nodeService: nodeService{