From 7c73ba1b05df3f4a96126bbc98e7a035736cea88 Mon Sep 17 00:00:00 2001 From: umagnus Date: Thu, 14 Mar 2024 08:13:20 +0000 Subject: [PATCH] add exec timeout func --- pkg/blob/controllerserver.go | 56 ++++++++++++++++-------------------- pkg/blob/volume_lock.go | 3 +- pkg/util/util.go | 28 ++++++++++++++++++ 3 files changed, 55 insertions(+), 32 deletions(-) diff --git a/pkg/blob/controllerserver.go b/pkg/blob/controllerserver.go index 5dbe00477..31000da7b 100644 --- a/pkg/blob/controllerserver.go +++ b/pkg/blob/controllerserver.go @@ -61,8 +61,6 @@ const ( MSI = "MSI" SPN = "SPN" authorizationPermissionMismatch = "AuthorizationPermissionMismatch" - - waitForAzCopyInterval = 2 * time.Second ) // CreateVolume provisions a volume @@ -85,7 +83,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) // logging the job status if it's volume cloning if req.GetVolumeContentSource() != nil { jobState, percent, err := d.azcopy.GetAzcopyJob(volName, []string{}) - klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err) + return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsWithAzcopyFmt, volName, jobState, percent, err) } return nil, status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, volName) } @@ -759,43 +757,39 @@ func (d *Driver) copyBlobContainer(req *csi.CreateVolumeRequest, accountSasToken return fmt.Errorf("srcContainerName(%s) or dstContainerName(%s) is empty", srcContainerName, dstContainerName) } - timeAfter := time.After(time.Duration(d.waitForAzCopyTimeoutMinutes) * time.Minute) - timeTick := time.Tick(waitForAzCopyInterval) srcPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, srcContainerName, accountSasToken) dstPath := fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, dstContainerName, accountSasToken) jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv) klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err) - if jobState == util.AzcopyJobError || jobState == util.AzcopyJobCompleted { + switch jobState { + case util.AzcopyJobError, util.AzcopyJobCompleted: return err - } - klog.V(2).Infof("begin to copy blob container %s to %s", srcContainerName, dstContainerName) - for { - select { - case <-timeTick: - jobState, percent, err := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv) - klog.V(2).Infof("azcopy job status: %s, copy percent: %s%%, error: %v", jobState, percent, err) - switch jobState { - case util.AzcopyJobError, util.AzcopyJobCompleted: - return err - case util.AzcopyJobNotFound: - klog.V(2).Infof("copy blob container %s to %s", srcContainerName, dstContainerName) - cmd := exec.Command("azcopy", "copy", srcPath, dstPath, "--recursive", "--check-length=false") - if len(authAzcopyEnv) > 0 { - cmd.Env = append(os.Environ(), authAzcopyEnv...) - } - out, copyErr := cmd.CombinedOutput() - if copyErr != nil { - klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error(%v): %v", resourceGroupName, accountName, dstPath, copyErr, string(out)) - } else { - klog.V(2).Infof("copied blob container %s to %s successfully", srcContainerName, dstContainerName) - } - return copyErr + case util.AzcopyJobRunning: + return fmt.Errorf("an existed azcopy job is running, copy percent: %s%%, please wait for it to complete", percent) + case util.AzcopyJobNotFound: + klog.V(2).Infof("copy blob container %s to %s", srcContainerName, dstContainerName) + copyErr := util.WaitForExecCompletion(time.Duration(d.waitForAzCopyTimeoutMinutes)*time.Minute, func() error { + cmd := exec.Command("azcopy", "copy", srcPath, dstPath, "--recursive", "--check-length=false") + if len(authAzcopyEnv) > 0 { + cmd.Env = append(os.Environ(), authAzcopyEnv...) } - case <-timeAfter: - return fmt.Errorf("timeout waiting for copy blob container %s to %s succeed", srcContainerName, dstContainerName) + if out, err := cmd.CombinedOutput(); err != nil { + return fmt.Errorf("exec error: %v, output: %v", err, string(out)) + } + return nil + }, func() error { + _, percent, _ := d.azcopy.GetAzcopyJob(dstContainerName, authAzcopyEnv) + return fmt.Errorf("timeout waiting for copy blob container %s to %s succeed, copy percent: %s%%", srcContainerName, dstContainerName, percent) + }) + if copyErr != nil { + klog.Warningf("CopyBlobContainer(%s, %s, %s) failed with error: %v", resourceGroupName, accountName, dstPath, copyErr) + } else { + klog.V(2).Infof("copied blob container %s to %s successfully", srcContainerName, dstContainerName) } + return copyErr } + return err } // copyVolume copies a volume form volume or snapshot, snapshot is not supported now diff --git a/pkg/blob/volume_lock.go b/pkg/blob/volume_lock.go index 2996d230d..95f779a9a 100644 --- a/pkg/blob/volume_lock.go +++ b/pkg/blob/volume_lock.go @@ -23,7 +23,8 @@ import ( ) const ( - volumeOperationAlreadyExistsFmt = "An operation with the given Volume ID %s already exists" + volumeOperationAlreadyExistsFmt = "An operation with the given Volume ID %s already exists" + volumeOperationAlreadyExistsWithAzcopyFmt = "An operation using azcopy with the given Volume ID %s already exists. Azcopy job status: %s, copy percent: %s%%, error: %v" ) // VolumeLocks implements a map with atomic operations. It stores a set of all volume IDs diff --git a/pkg/util/util.go b/pkg/util/util.go index 2cf64a99b..6a9ff07c2 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -24,6 +24,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/go-ini/ini" "github.com/pkg/errors" @@ -387,3 +388,30 @@ func SetVolumeOwnership(path, gid, policy string) error { } return volume.SetVolumeOwnership(&VolumeMounter{path: path}, path, &gidInt64, &fsGroupChangePolicy, nil) } + +// ExecFunc returns a exec function's output and error +type ExecFunc func() (err error) + +// TimeoutFunc returns output and error if an ExecFunc timeout +type TimeoutFunc func() (err error) + +// WaitForExecCompletion waits for the exec function to complete or times out +func WaitForExecCompletion(timeout time.Duration, execFunc ExecFunc, timeoutFunc TimeoutFunc) error { + // Create a channel to receive the result of the azcopy exec function + done := make(chan bool) + var err error + + // Start the azcopy exec function in a goroutine + go func() { + err = execFunc() + done <- true + }() + + // Wait for the function to complete or time out + select { + case <-done: + return err + case <-time.After(timeout): + return timeoutFunc() + } +}