diff --git a/scaler.go b/scaler.go index 70b7547..0833edb 100644 --- a/scaler.go +++ b/scaler.go @@ -42,6 +42,10 @@ type Scaler[T, U any] struct { // that are CPU bound and need to scale up more/less quickly. WaitModifier DurationScaler + // Max is the maximum number of layer2 routines that will be spawned. + // If Max is set to 0, then there is no limit. + Max int + wScale *DurationScaler } @@ -99,7 +103,15 @@ func (s Scaler[T, U]) Exec(ctx context.Context, in <-chan T) (<-chan U, error) { ticker := time.NewTicker(s.Wait) defer ticker.Stop() step := 0 - var stepMu sync.RWMutex + + var max chan struct{} + + if s.Max > 0 { + max = make(chan struct{}, s.Max) + if s.Max == 0 { + close(max) // no limit + } + } scaleLoop: for { @@ -117,23 +129,40 @@ func (s Scaler[T, U]) Exec(ctx context.Context, in <-chan T) (<-chan U, error) { case <-ctx.Done(): return case <-ticker.C: + if s.Max != 0 { + select { + case <-ctx.Done(): + return + case <-max: // start a new layer2 routine + default: + // wait for a layer2 routine to finish + continue l2loop + } + } + wgMu.Lock() wg.Add(1) wgMu.Unlock() if !s.WaitModifier.inactive() { - stepMu.Lock() step++ - stepMu.Unlock() } go func() { defer wg.Done() + + if s.Max > 0 { + defer func() { + select { + case <-ctx.Done(): + case max <- struct{}{}: + } + }() + } + if !s.WaitModifier.inactive() { defer func() { - stepMu.Lock() step-- - stepMu.Unlock() }() } @@ -144,11 +173,9 @@ func (s Scaler[T, U]) Exec(ctx context.Context, in <-chan T) (<-chan U, error) { } } - stepMu.RLock() // Reset the ticker so that it does not immediately trip the // case statement on loop. ticker.Reset(s.wScale.scaledDuration(s.Wait, step)) - stepMu.RUnlock() } } }()