Skip to content

Commit

Permalink
feat: delete pods in parallel to speed up retryworkflow (#12419)
Browse files Browse the repository at this point in the history
Signed-off-by: shuangkun <[email protected]>
Co-authored-by: sherwinkoo29 <[email protected]>
  • Loading branch information
shuangkun and sherwinkoo29 authored Jan 10, 2024
1 parent 3652241 commit 2bb770e
Showing 1 changed file with 27 additions and 4 deletions.
31 changes: 27 additions & 4 deletions server/workflow/workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"sort"
"sync"

log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -370,6 +371,15 @@ func (s *workflowServer) DeleteWorkflow(ctx context.Context, req *workflowpkg.Wo
return &workflowpkg.WorkflowDeleteResponse{}, nil
}

func errorFromChannel(errCh <-chan error) error {
select {
case err := <-errCh:
return err
default:
}
return nil
}

func (s *workflowServer) RetryWorkflow(ctx context.Context, req *workflowpkg.WorkflowRetryRequest) (*wfv1.Workflow, error) {
wfClient := auth.GetWfClient(ctx)
kubeClient := auth.GetKubeClient(ctx)
Expand All @@ -394,12 +404,25 @@ func (s *workflowServer) RetryWorkflow(ctx context.Context, req *workflowpkg.Wor
return nil, sutils.ToStatusError(err, codes.Internal)
}

errCh := make(chan error, len(podsToDelete))
var wg sync.WaitGroup
wg.Add(len(podsToDelete))
for _, podName := range podsToDelete {
log.WithFields(log.Fields{"podDeleted": podName}).Info("Deleting pod")
err := kubeClient.CoreV1().Pods(wf.Namespace).Delete(ctx, podName, metav1.DeleteOptions{})
if err != nil && !apierr.IsNotFound(err) {
return nil, sutils.ToStatusError(err, codes.Internal)
}
go func(podName string) {
defer wg.Done()
err := kubeClient.CoreV1().Pods(wf.Namespace).Delete(ctx, podName, metav1.DeleteOptions{})
if err != nil && !apierr.IsNotFound(err) {
errCh <- err
return
}
}(podName)
}
wg.Wait()

err = errorFromChannel(errCh)
if err != nil {
return nil, sutils.ToStatusError(err, codes.Internal)
}

err = s.hydrator.Dehydrate(wf)
Expand Down

0 comments on commit 2bb770e

Please sign in to comment.