Skip to content

Commit

Permalink
feat(scaler.go): add max routine support to scaler
Browse files Browse the repository at this point in the history
This change also removes the mutexes that were guarding the step int for now
  • Loading branch information
benjivesterby committed Mar 27, 2023
1 parent 607b7cf commit 7d06b2a
Showing 1 changed file with 34 additions and 7 deletions.
41 changes: 34 additions & 7 deletions scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ type Scaler[T, U any] struct {
// that are CPU bound and need to scale up more/less quickly.
WaitModifier DurationScaler

// Max is the maximum number of layer2 routines that will be spawned.
// If Max is set to 0, then there is no limit.
Max int

wScale *DurationScaler
}

Expand Down Expand Up @@ -99,7 +103,15 @@ func (s Scaler[T, U]) Exec(ctx context.Context, in <-chan T) (<-chan U, error) {
ticker := time.NewTicker(s.Wait)
defer ticker.Stop()
step := 0
var stepMu sync.RWMutex

var max chan struct{}

if s.Max > 0 {
max = make(chan struct{}, s.Max)
if s.Max == 0 {
close(max) // no limit
}
}

scaleLoop:
for {
Expand All @@ -117,23 +129,40 @@ func (s Scaler[T, U]) Exec(ctx context.Context, in <-chan T) (<-chan U, error) {
case <-ctx.Done():
return
case <-ticker.C:
if s.Max != 0 {
select {
case <-ctx.Done():
return
case <-max: // start a new layer2 routine
default:
// wait for a layer2 routine to finish
continue l2loop
}
}

wgMu.Lock()
wg.Add(1)
wgMu.Unlock()

if !s.WaitModifier.inactive() {
stepMu.Lock()
step++
stepMu.Unlock()
}

go func() {
defer wg.Done()

if s.Max > 0 {
defer func() {
select {
case <-ctx.Done():
case max <- struct{}{}:
}
}()
}

if !s.WaitModifier.inactive() {
defer func() {
stepMu.Lock()
step--
stepMu.Unlock()
}()
}

Expand All @@ -144,11 +173,9 @@ func (s Scaler[T, U]) Exec(ctx context.Context, in <-chan T) (<-chan U, error) {
}
}

stepMu.RLock()
// Reset the ticker so that it does not immediately trip the
// case statement on loop.
ticker.Reset(s.wScale.scaledDuration(s.Wait, step))
stepMu.RUnlock()
}
}
}()
Expand Down

0 comments on commit 7d06b2a

Please sign in to comment.