From 1c44f57d7217ac8eb24b24eadeb82b12b363af94 Mon Sep 17 00:00:00 2001 From: umagnus Date: Thu, 14 Mar 2024 08:13:20 +0000 Subject: [PATCH] add exec timeout func --- pkg/blob/controllerserver.go | 58 ++++++++++++++----------------- pkg/blob/controllerserver_test.go | 9 ++--- pkg/blob/volume_lock.go | 3 +- pkg/util/util.go | 28 +++++++++++++++ pkg/util/util_test.go | 52 +++++++++++++++++++++++++++ 5 files changed, 112 insertions(+), 38 deletions(-) diff --git a/pkg/blob/controllerserver.go b/pkg/blob/controllerserver.go index 5dbe00477..ae469954e 100644 --- a/pkg/blob/controllerserver.go +++ b/pkg/blob/controllerserver.go @@ -61,8 +61,6 @@ const ( MSI = "MSI" SPN = "SPN" authorizationPermissionMismatch = "AuthorizationPermissionMismatch" - - waitForAzCopyInterval = 2 * time.Second ) // CreateVolume provisions a volume @@ -85,7 +83,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) // logging the job status if it's volume cloning if req.GetVolumeContentSource() != nil { jobState, percent, err := d.azcopy.GetAzcopyJob(volName, []string{}) - klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err) + return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsWithAzcopyFmt, volName, jobState, percent, err) } return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volName) } @@ -759,43 +757,41 @@ func (d *Driver) copyBlobContainer(req *csi.CreateVolumeRequest, accountSasToken return fmt.Errorf("srcContainerName(%s) or dstContainerName(%s) is empty", srcContainerName, dstContainerName) } - timeAfter := time.After(time.Duration(d.waitForAzCopyTimeoutMinutes) * time.Minute) - timeTick := time.Tick(waitForAzCopyInterval) srcPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, srcContainerName, accountSasToken) dstPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, dstContainerName, accountSasToken) jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv) klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err) - if jobState == util.AzcopyJobError || jobState == util.AzcopyJobCompleted { + switch jobState { + case util.AzcopyJobError, util.AzcopyJobCompleted: return err - } - klog.V(2).Infof("begin to copy blob container %s to %s", srcContainerName, dstContainerName) - for { - select { - case <-timeTick: - jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv) - klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err) - switch jobState { - case util.AzcopyJobError, util.AzcopyJobCompleted: - return err - case util.AzcopyJobNotFound: - klog.V(2).Infof("copy blob container %s to %s", srcContainerName, dstContainerName) - cmd := exec.Command("azcopy", "copy", srcPath, dstPath, "--recursive", "--check-length=false") - if len(authAzcopyEnv) > 0 { - cmd.Env = append(os.Environ(), authAzcopyEnv...) - } - out, copyErr := cmd.CombinedOutput() - if copyErr != nil { - klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error(%v): %v", resourceGroupName, accountName, dstPath, copyErr, string(out)) - } else { - klog.V(2).Infof("copied blob container %s to %s successfully", srcContainerName, dstContainerName) - } - return copyErr + case util.AzcopyJobRunning: + return fmt.Errorf("wait for the existing AzCopy job to complete, current copy percentage is %s%%", percent) + case util.AzcopyJobNotFound: + klog.V(2).Infof("copy blob container %s to %s", srcContainerName, dstContainerName) + execFunc := func() error { + cmd := exec.Command("azcopy", "copy", srcPath, dstPath, "--recursive", "--check-length=false") + if len(authAzcopyEnv) > 0 { + cmd.Env = append(os.Environ(), authAzcopyEnv...) + } + if out, err := cmd.CombinedOutput(); err != nil { + return fmt.Errorf("exec error: %v, output: %v", err, string(out)) } - case <-timeAfter: - return fmt.Errorf("timeout waiting for copy blob container %s to %s succeed", srcContainerName, dstContainerName) + return nil + } + timeoutFunc := func() error { + _, percent, _ := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv) + return fmt.Errorf("timeout waiting for copy blob container %s to %s complete, current copy percent: %s%%", srcContainerName, dstContainerName, percent) + } + copyErr := util.WaitForExecCompletion(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, execFunc, timeoutFunc) + if copyErr != nil { + klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error: %v", resourceGroupName, accountName, dstPath, copyErr) + } else { + klog.V(2).Infof("copied blob container %s to %s successfully", srcContainerName, dstContainerName) } + return copyErr } + return err } // copyVolume copies a volume form volume or snapshot, snapshot is not supported now diff --git a/pkg/blob/controllerserver_test.go b/pkg/blob/controllerserver_test.go index 22bc9179e..bb3a7a219 100644 --- a/pkg/blob/controllerserver_test.go +++ b/pkg/blob/controllerserver_test.go @@ -1712,7 +1712,7 @@ func TestCopyVolume(t *testing.T) { }, }, { - name: "azcopy job is first in progress and then be completed", + name: "azcopy job is in progress", testFunc: func(t *testing.T) { d := NewFakeDriver() mp := map[string]string{} @@ -1739,15 +1739,12 @@ func TestCopyVolume(t *testing.T) { m := util.NewMockEXEC(ctrl) listStr1 := "JobId: ed1c3833-eaff-fe42-71d7-513fb065a9d9\nStart Time: Monday, 07-Aug-23 03:29:54 UTC\nStatus: InProgress\nCommand: copy https://{accountName}.file.core.windows.net/{srcFileshare}{SAStoken} https://{accountName}.file.core.windows.net/{dstFileshare}{SAStoken} --recursive --check-length=false" - listStr2 := "JobId: ed1c3833-eaff-fe42-71d7-513fb065a9d9\nStart Time: Monday, 07-Aug-23 03:29:54 UTC\nStatus: Completed\nCommand: copy https://{accountName}.file.core.windows.net/{srcFileshare}{SAStoken} https://{accountName}.file.core.windows.net/{dstFileshare}{SAStoken} --recursive --check-length=false" - o1 := m.EXPECT().RunCommand(gomock.Eq("azcopy jobs list | grep dstContainer -B 3"), gomock.Any()).Return(listStr1, nil).Times(1) + m.EXPECT().RunCommand(gomock.Eq("azcopy jobs list | grep dstContainer -B 3"), gomock.Any()).Return(listStr1, nil).Times(1) m.EXPECT().RunCommand(gomock.Not("azcopy jobs list | grep dstBlobContainer -B 3"), gomock.Any()).Return("Percent Complete (approx): 50.0", nil) - o2 := m.EXPECT().RunCommand(gomock.Eq("azcopy jobs list | grep dstContainer -B 3"), gomock.Any()).Return(listStr2, nil) - gomock.InOrder(o1, o2) d.azcopy.ExecCmd = m - var expectedErr error + expectedErr := fmt.Errorf("wait for the existing AzCopy job to complete, current copy percentage is 50.0%%") err := d.copyVolume(req, "sastoken", nil, "dstContainer", "core.windows.net") if !reflect.DeepEqual(err, expectedErr) { t.Errorf("Unexpected error: %v", err) diff --git a/pkg/blob/volume_lock.go b/pkg/blob/volume_lock.go index 2996d230d..95f779a9a 100644 --- a/pkg/blob/volume_lock.go +++ b/pkg/blob/volume_lock.go @@ -23,7 +23,8 @@ import ( ) const ( - volumeOperationAlreadyExistsFmt = "An operation with the given Volume ID %s already exists" + volumeOperationAlreadyExistsFmt = "An operation with the given Volume ID %s already exists" + volumeOperationAlreadyExistsWithAzcopyFmt = "An operation using azcopy with the given Volume ID %s already exists. Azcopy job status: %s, copy percent: %s%%, error: %v" ) // VolumeLocks implements a map with atomic operations. It stores a set of all volume IDs diff --git a/pkg/util/util.go b/pkg/util/util.go index 2cf64a99b..bb5e60052 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -24,6 +24,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/go-ini/ini" "github.com/pkg/errors" @@ -387,3 +388,30 @@ func SetVolumeOwnership(path, gid, policy string) error { } return volume.SetVolumeOwnership(&VolumeMounter{path: path}, path, &gidInt64, &fsGroupChangePolicy, nil) } + +// ExecFunc returns a exec function's output and error +type ExecFunc func() (err error) + +// TimeoutFunc returns output and error if an ExecFunc timeout +type TimeoutFunc func() (err error) + +// WaitForExecCompletion waits for the exec function to complete or return timeout error +func WaitForExecCompletion(timeout time.Duration, execFunc ExecFunc, timeoutFunc TimeoutFunc) error { + // Create a channel to receive the result of the azcopy exec function + done := make(chan bool) + var err error + + // Start the azcopy exec function in a goroutine + go func() { + err = execFunc() + done <- true + }() + + // Wait for the function to complete or time out + select { + case <-done: + return err + case <-time.After(timeout): + return timeoutFunc() + } +} diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go index 0e97a7b5f..3cec59e5a 100644 --- a/pkg/util/util_test.go +++ b/pkg/util/util_test.go @@ -656,3 +656,55 @@ func TestSetVolumeOwnership(t *testing.T) { } } } + +func TestWaitForExecCompletion(t *testing.T) { + tests := []struct { + desc string + timeout time.Duration + execFunc ExecFunc + timeoutFunc TimeoutFunc + expectedErr error + }{ + { + desc: "execFunc returns error", + timeout: 1 * time.Second, + execFunc: func() error { + return fmt.Errorf("execFunc error") + }, + timeoutFunc: func() error { + return fmt.Errorf("timeout error") + }, + expectedErr: fmt.Errorf("execFunc error"), + }, + { + desc: "execFunc timeout", + timeout: 1 * time.Second, + execFunc: func() error { + time.Sleep(2 * time.Second) + return nil + }, + timeoutFunc: func() error { + return fmt.Errorf("timeout error") + }, + expectedErr: fmt.Errorf("timeout error"), + }, + { + desc: "execFunc completed successfully", + timeout: 1 * time.Second, + execFunc: func() error { + return nil + }, + timeoutFunc: func() error { + return fmt.Errorf("timeout error") + }, + expectedErr: nil, + }, + } + + for _, test := range tests { + err := WaitForExecCompletion(test.timeout, test.execFunc, test.timeoutFunc) + if err != nil && (err.Error() != test.expectedErr.Error()) { + t.Errorf("unexpected error: %v, expected error: %v", err, test.expectedErr) + } + } +}