Skip to content

Commit

Permalink
fix: not owned stream count (#13030)
Browse files Browse the repository at this point in the history
Signed-off-by: Vladyslav Diachenko <[email protected]>
  • Loading branch information
vlad-diachenko authored May 24, 2024
1 parent 8101e21 commit 4901a5c
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 12 deletions.
4 changes: 2 additions & 2 deletions pkg/ingester/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestStreamCountLimiter_AssertNewStreamAllowed(t *testing.T) {
expected error
useOwnedStreamService bool
fixedLimit int32
ownedStreamCount int64
ownedStreamCount int
}{
"both local and global limit are disabled": {
maxLocalStreamsPerUser: 0,
Expand Down Expand Up @@ -147,7 +147,7 @@ func TestStreamCountLimiter_AssertNewStreamAllowed(t *testing.T) {

ownedStreamSvc := &ownedStreamService{
fixedLimit: atomic.NewInt32(testData.fixedLimit),
ownedStreamCount: atomic.NewInt64(testData.ownedStreamCount),
ownedStreamCount: testData.ownedStreamCount,
}
limiter := NewLimiter(limits, NilMetrics, ring, testData.ringReplicationFactor)
defaultCountSupplier := func() int {
Expand Down
33 changes: 24 additions & 9 deletions pkg/ingester/owned_streams.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,36 @@
package ingester

import "go.uber.org/atomic"
import (
"sync"

"go.uber.org/atomic"
)

type ownedStreamService struct {
tenantID string
limiter *Limiter
fixedLimit *atomic.Int32

//todo: implement job to recalculate it
ownedStreamCount *atomic.Int64
ownedStreamCount int
notOwnedStreamCount int
lock sync.RWMutex
}

func newOwnedStreamService(tenantID string, limiter *Limiter) *ownedStreamService {
svc := &ownedStreamService{
tenantID: tenantID,
limiter: limiter,
ownedStreamCount: atomic.NewInt64(0),
fixedLimit: atomic.NewInt32(0),
tenantID: tenantID,
limiter: limiter,
fixedLimit: atomic.NewInt32(0),
}
svc.updateFixedLimit()
return svc
}

func (s *ownedStreamService) getOwnedStreamCount() int {
return int(s.ownedStreamCount.Load())
s.lock.RLock()
defer s.lock.RUnlock()
return s.ownedStreamCount
}

func (s *ownedStreamService) updateFixedLimit() {
Expand All @@ -36,9 +43,17 @@ func (s *ownedStreamService) getFixedLimit() int {
}

func (s *ownedStreamService) incOwnedStreamCount() {
s.ownedStreamCount.Inc()
s.lock.Lock()
defer s.lock.Unlock()
s.ownedStreamCount++
}

func (s *ownedStreamService) decOwnedStreamCount() {
s.ownedStreamCount.Dec()
s.lock.Lock()
defer s.lock.Unlock()
if s.notOwnedStreamCount > 0 {
s.notOwnedStreamCount--
return
}
s.ownedStreamCount--
}
32 changes: 31 additions & 1 deletion pkg/ingester/owned_streams_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ingester

import (
"sync"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -29,8 +30,37 @@ func Test_OwnedStreamService(t *testing.T) {

service.incOwnedStreamCount()
service.incOwnedStreamCount()
require.Equal(t, 2, service.getOwnedStreamCount())
service.incOwnedStreamCount()
require.Equal(t, 3, service.getOwnedStreamCount())

// simulate the effect from the recalculation job
service.notOwnedStreamCount = 1
service.ownedStreamCount = 2

service.decOwnedStreamCount()
require.Equal(t, 2, service.getOwnedStreamCount(), "owned stream count must be decremented only when notOwnedStreamCount is set to 0")
require.Equal(t, 0, service.notOwnedStreamCount)

service.decOwnedStreamCount()
require.Equal(t, 1, service.getOwnedStreamCount())
require.Equal(t, 0, service.notOwnedStreamCount, "notOwnedStreamCount must not be decremented lower than 0")

group := sync.WaitGroup{}
group.Add(200)
for i := 0; i < 100; i++ {
go func() {
defer group.Done()
service.incOwnedStreamCount()
}()
}

for i := 0; i < 100; i++ {
go func() {
defer group.Done()
service.decOwnedStreamCount()
}()
}
group.Wait()

require.Equal(t, 1, service.getOwnedStreamCount(), "owned stream count must not be changed")
}

0 comments on commit 4901a5c

Please sign in to comment.