Skip to content
This repository has been archived by the owner on Aug 12, 2022. It is now read-only.

Commit

Permalink
fix: Use errgroup SetLimit (#363)
Browse files Browse the repository at this point in the history
  • Loading branch information
erezrokah authored Jun 21, 2022
1 parent ed28f11 commit 964a1bb
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 15 deletions.
7 changes: 7 additions & 0 deletions helpers/integers.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,10 @@ func Uint64ToInt64(i uint64) int64 {
}
return int64(i)
}

func Uint64ToInt(i uint64) int {
if i > math.MaxInt {
return math.MaxInt
}
return int(i)
}
18 changes: 3 additions & 15 deletions provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,14 +253,10 @@ func (p *Provider) FetchResources(ctx context.Context, request *cqproto.FetchRes
p.Logger.Info("calculated max goroutines for fetch execution", "max_goroutines", maxGoroutines)
goroutinesSem = semaphore.NewWeighted(helpers.Uint64ToInt64(maxGoroutines))

// limiter used to limit the amount of resources fetched concurrently
var parallelResourceSem *semaphore.Weighted
maxParallelFetchingLimit := request.ParallelFetchingLimit
if maxParallelFetchingLimit > 0 {
parallelResourceSem = semaphore.NewWeighted(helpers.Uint64ToInt64(maxParallelFetchingLimit))
}

g, gctx := errgroup.WithContext(ctx)
if request.ParallelFetchingLimit > 0 {
g.SetLimit(helpers.Uint64ToInt(request.ParallelFetchingLimit))
}
finishedResources := make(map[string]bool, len(resources))
l := &sync.Mutex{}
var totalResourceCount uint64
Expand All @@ -276,15 +272,7 @@ func (p *Provider) FetchResources(ctx context.Context, request *cqproto.FetchRes
l.Lock()
finishedResources[r] = false
l.Unlock()
if parallelResourceSem != nil {
if err := parallelResourceSem.Acquire(ctx, 1); err != nil {
return err
}
}
g.Go(func() error {
if parallelResourceSem != nil {
defer parallelResourceSem.Release(1)
}
resourceCount, diags := tableExec.Resolve(gctx, p.meta)
l.Lock()
defer l.Unlock()
Expand Down

0 comments on commit 964a1bb

Please sign in to comment.