Skip to content

Commit

Permalink
Clear pending tasks in the worker when the context is canceled to avo…
Browse files Browse the repository at this point in the history
…id deadlocks in StopAndWait when tasks are queued for the worker.
  • Loading branch information
Corentin Clabaut committed Jun 7, 2024
1 parent b9d24f4 commit a1caa4e
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 8 deletions.
12 changes: 6 additions & 6 deletions pond.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,11 @@ func (p *WorkerPool) stop(waitForQueuedTasksToComplete bool) {
// Mark pool as stopped
atomic.StoreInt32(&p.stopped, 1)

// close tasks channel (only once, in case multiple concurrent calls to StopAndWait are made)
p.tasksCloseOnce.Do(func() {
close(p.tasks)
})

if waitForQueuedTasksToComplete {
// Wait for all queued tasks to complete
p.tasksWaitGroup.Wait()
Expand All @@ -366,11 +371,6 @@ func (p *WorkerPool) stop(waitForQueuedTasksToComplete bool) {

// Wait for all workers & purger goroutine to exit
p.workersWaitGroup.Wait()

// close tasks channel (only once, in case multiple concurrent calls to StopAndWait are made)
p.tasksCloseOnce.Do(func() {
close(p.tasks)
})
}

// purge represents the work done by the purger goroutine
Expand Down Expand Up @@ -420,7 +420,7 @@ func (p *WorkerPool) maybeStartWorker(firstTask func()) bool {
}

// Launch worker goroutine
go worker(p.context, &p.workersWaitGroup, firstTask, p.tasks, p.executeTask)
go worker(p.context, &p.workersWaitGroup, firstTask, p.tasks, p.executeTask, &p.tasksWaitGroup)

return true
}
Expand Down
41 changes: 41 additions & 0 deletions pond_blackbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,47 @@ func TestSubmitWithContext(t *testing.T) {
assertEqual(t, int32(0), atomic.LoadInt32(&doneCount))
}

func TestSubmitWithContextCancelWithIdleTasks(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())

pool := pond.New(1, 5, pond.Context(ctx))

var doneCount, taskCount int32

// Submit a long-running, cancellable task
pool.Submit(func() {
atomic.AddInt32(&taskCount, 1)
select {
case <-ctx.Done():
return
case <-time.After(10 * time.Minute):
atomic.AddInt32(&doneCount, 1)
return
}
})

// Submit a long-running, cancellable task
pool.Submit(func() {
atomic.AddInt32(&taskCount, 1)
select {
case <-ctx.Done():
return
case <-time.After(10 * time.Minute):
atomic.AddInt32(&doneCount, 1)
return
}
})

// Cancel the context
cancel()

pool.StopAndWait()

assertEqual(t, int32(1), atomic.LoadInt32(&taskCount))
assertEqual(t, int32(0), atomic.LoadInt32(&doneCount))
}

func TestConcurrentStopAndWait(t *testing.T) {

pool := pond.New(1, 5)
Expand Down
7 changes: 5 additions & 2 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

// worker represents a worker goroutine
func worker(context context.Context, waitGroup *sync.WaitGroup, firstTask func(), tasks <-chan func(), taskExecutor func(func(), bool)) {
func worker(context context.Context, waitGroup *sync.WaitGroup, firstTask func(), tasks <-chan func(), taskExecutor func(func(), bool), taskWaitGroup *sync.WaitGroup) {

// If provided, execute the first task immediately, before listening to the tasks channel
if firstTask != nil {
Expand All @@ -20,7 +20,10 @@ func worker(context context.Context, waitGroup *sync.WaitGroup, firstTask func()
for {
select {
case <-context.Done():
// Pool context was cancelled, exit
// Pool context was cancelled, empty tasks channel and exit
for _ = range tasks {
taskWaitGroup.Done()
}
return
case task, ok := <-tasks:
if task == nil || !ok {
Expand Down

0 comments on commit a1caa4e

Please sign in to comment.