Skip to content

Commit

Permalink
use error group to concurrently execute the tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
wangxinyi7 committed Aug 30, 2024
1 parent 7e26d8c commit f69c0f3
Showing 1 changed file with 18 additions and 20 deletions.
38 changes: 18 additions & 20 deletions control-plane/subcommand/sync-catalog/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/hashicorp/go-hclog"
"github.com/mitchellh/cli"
"github.com/prometheus/client_golang/prometheus/promhttp"
"golang.org/x/sync/errgroup"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
Expand Down Expand Up @@ -466,9 +467,8 @@ func (c *Command) removeAllK8SServicesFromConsulNode(consulClient *api.Client) e
return err
}

var wg sync.WaitGroup
var firstErr error
services := node.Services
errChan := make(chan error, 1)
batchSize := 300
maxRetries := 2
retryDelay := 200 * time.Millisecond
Expand All @@ -493,34 +493,32 @@ func (c *Command) removeAllK8SServicesFromConsulNode(consulClient *api.Client) e
end = len(services)
}

wg.Add(1)
go func(batch []*api.AgentService) {
defer wg.Done()

for _, service := range batch {
var eg errgroup.Group
for _, service := range services[i:end] {
s := service
eg.Go(func() error {
var b backoff.BackOff = backoff.NewConstantBackOff(retryDelay)
b = backoff.WithMaxRetries(b, uint64(maxRetries))
err := backoff.Retry(func() error {
return backoff.Retry(func() error {
_, err := consulClient.Catalog().Deregister(&api.CatalogDeregistration{
Node: c.flagPurgeK8SServicesFromNode,
ServiceID: service.ID,
ServiceID: s.ID,
}, nil)
return err
}, b)
if err != nil {
if len(errChan) == 0 {
errChan <- err
}
}
})
}
if err := eg.Wait(); err != nil {
if firstErr == nil {
c.UI.Info("Some K8S services were not deregistered from Consul")
firstErr = err
}
c.UI.Info(fmt.Sprintf("Processed %v K8S services from %v", len(batch), c.flagPurgeK8SServicesFromNode))
}(services[i:end])
wg.Wait()
}
c.UI.Info(fmt.Sprintf("Processed %v K8S services from %v", end-i, c.flagPurgeK8SServicesFromNode))
}

close(errChan)
if err = <-errChan; err != nil {
return err
if firstErr != nil {
return firstErr
}
c.UI.Info("All K8S services were deregistered from Consul")
return nil
Expand Down

0 comments on commit f69c0f3

Please sign in to comment.