Skip to content

Commit

Permalink
fix: Add QueuedChannel.CloseAndDiscardQueued
Browse files Browse the repository at this point in the history
Force exists the goroutine that populates the channel.
  • Loading branch information
LBeernaertProton committed Oct 21, 2022
1 parent ca3b090 commit 82064e3
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 4 deletions.
22 changes: 18 additions & 4 deletions queue/queued_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,18 @@ import (
// buffer size should be.
type QueuedChannel[T any] struct {
ch chan T
stopCh chan struct{}
items []T
cond *sync.Cond
closed atomicBool // Should use atomic.Bool once we use Go 1.19!
}

func NewQueuedChannel[T any](chanBufferSize, queueCapacity int) *QueuedChannel[T] {
queue := &QueuedChannel[T]{
ch: make(chan T, chanBufferSize),
items: make([]T, 0, queueCapacity),
cond: sync.NewCond(&sync.Mutex{}),
ch: make(chan T, chanBufferSize),
stopCh: make(chan struct{}),
items: make([]T, 0, queueCapacity),
cond: sync.NewCond(&sync.Mutex{}),
}

// The queue is initially not closed.
Expand All @@ -33,7 +35,13 @@ func NewQueuedChannel[T any](chanBufferSize, queueCapacity int) *QueuedChannel[T
return
}

queue.ch <- item
select {
case queue.ch <- item:
continue

case <-queue.stopCh:
return
}
}
}()

Expand Down Expand Up @@ -68,6 +76,12 @@ func (q *QueuedChannel[T]) Close() {
q.cond.Broadcast()
}

// CloseAndDiscardQueued force closes the channel and does not guarantee that the remaining queued items will be read.
func (q *QueuedChannel[T]) CloseAndDiscardQueued() {
close(q.stopCh)
q.Close()
}

func (q *QueuedChannel[T]) pop() (T, bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
Expand Down
12 changes: 12 additions & 0 deletions queue/queued_channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,15 @@ func TestQueuedChannel(t *testing.T) {
// Enqueuing more items after the queue is closed should return false.
require.False(t, queue.Enqueue(7, 8, 9))
}

func TestQueuedChannelDoesNotLeakIfThereAreNoReadersOnCloseAndDiscard(t *testing.T) {
defer goleak.VerifyNone(t)

// Create a new queued channel.
queue := NewQueuedChannel[int](1, 3)

// Push some items to the queue.
require.True(t, queue.Enqueue(1, 2, 3))

queue.CloseAndDiscardQueued()
}

0 comments on commit 82064e3

Please sign in to comment.