Skip to content

Commit

Permalink
Merge pull request #34 from alitto/issue/22
Browse files Browse the repository at this point in the history
Prevent deadlock when purging idle worker during task submission
  • Loading branch information
alitto authored Aug 28, 2022
2 parents d01340a + 7edfde9 commit c7a74b9
Show file tree
Hide file tree
Showing 12 changed files with 145 additions and 80 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
name: Test
strategy:
matrix:
go-version: [1.15.x, 1.16.x, 1.17.x, 1.18.x]
go-version: [1.15.x, 1.16.x, 1.17.x, 1.18.x, 1.19.x]
os: [ubuntu-latest, macos-latest, windows-latest]
runs-on: ${{ matrix.os }}
steps:
Expand All @@ -21,7 +21,7 @@ jobs:
- name: Checkout code
uses: actions/checkout@v2
- name: Test
run: go test -race -v ./
run: make test
codecov:
name: Upload coverage report to Codecov
runs-on: ubuntu-latest
Expand All @@ -33,9 +33,9 @@ jobs:
- name: Checkout code
uses: actions/checkout@v2
- name: Test
run: go test -race -v -coverprofile=coverage.txt -covermode=atomic ./
run: make coverage
- uses: codecov/codecov-action@v2
with:
files: ./coverage.txt
files: coverage.out
fail_ci_if_error: true
verbose: true
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
test:
go test -race -v ./

coverage:
go test -race -v -coverprofile=coverage.out -covermode=atomic ./
2 changes: 1 addition & 1 deletion examples/dynamic_size/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/alitto/pond/examples/dynamic_size

go 1.18
go 1.19

require (
github.com/alitto/pond v1.7.1
Expand Down
2 changes: 1 addition & 1 deletion examples/fixed_size/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/alitto/pond/examples/fixed_size

go 1.18
go 1.19

require (
github.com/alitto/pond v1.7.1
Expand Down
2 changes: 1 addition & 1 deletion examples/group_context/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/alitto/pond/examples/group_context

go 1.18
go 1.19

require (
github.com/alitto/pond v1.7.1
Expand Down
2 changes: 1 addition & 1 deletion examples/pool_context/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/alitto/pond/examples/pool_context

go 1.18
go 1.19

require github.com/alitto/pond v1.7.1

Expand Down
2 changes: 1 addition & 1 deletion examples/prometheus/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/alitto/pond/examples/fixed_size

go 1.18
go 1.19

require (
github.com/alitto/pond v1.7.1
Expand Down
2 changes: 1 addition & 1 deletion examples/task_group/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/alitto/pond/examples/task_group

go 1.18
go 1.19

require (
github.com/alitto/pond v1.7.1
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module github.com/alitto/pond

go 1.18
go 1.19
134 changes: 66 additions & 68 deletions pond.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,9 @@ func (p *WorkerPool) stop(waitForQueuedTasksToComplete bool) {
p.tasksWaitGroup.Wait()
}

// Reset worker count
p.resetWorkerCount()

// Terminate all workers & purger goroutine
p.contextCancel()

Expand All @@ -380,42 +383,49 @@ func (p *WorkerPool) purge() {
select {
// Timed out waiting for any activity to happen, attempt to stop an idle worker
case <-idleTicker.C:
p.stopIdleWorker()
p.maybeStopIdleWorker()
// Pool context was cancelled, exit
case <-p.context.Done():
return
}
}
}

// stopIdleWorker attempts to stop an idle worker by sending it a nil task
func (p *WorkerPool) stopIdleWorker() {
if p.IdleWorkers() > 0 && p.RunningWorkers() > p.minWorkers && !p.Stopped() {
p.tasks <- nil
// maybeStopIdleWorker attempts to stop an idle worker by sending it a nil task
func (p *WorkerPool) maybeStopIdleWorker() bool {

if decremented := p.decrementWorkerCount(); !decremented {
return false
}

// Send a nil task to stop an idle worker
p.tasks <- nil

return true
}

// maybeStartWorker attempts to create a new worker goroutine to run the given task.
// If the worker pool has reached the maximum number of workers or there are idle workers,
// it will not create a new worker.
// it will not create a new one.
func (p *WorkerPool) maybeStartWorker(firstTask func()) bool {

if ok := p.incrementWorkerCount(); !ok {
if incremented := p.incrementWorkerCount(); !incremented {
return false
}

if firstTask == nil {
// Worker starts idle
atomic.AddInt32(&p.idleWorkerCount, 1)
}

// Launch worker goroutine
go worker(p.context, firstTask, p.tasks, &p.idleWorkerCount, p.decrementWorkerCount, p.executeTask)
go worker(p.context, &p.workersWaitGroup, firstTask, p.tasks, p.executeTask)

return true
}

// executeTask executes the given task and updates task-related counters
func (p *WorkerPool) executeTask(task func()) {
func (p *WorkerPool) executeTask(task func(), isFirstTask bool) {

defer func() {
if panic := recover(); panic != nil {
Expand All @@ -424,60 +434,90 @@ func (p *WorkerPool) executeTask(task func()) {

// Invoke panic handler
p.panicHandler(panic)

// Increment idle count
atomic.AddInt32(&p.idleWorkerCount, 1)
}
p.tasksWaitGroup.Done()
}()

// Decrement idle count
if !isFirstTask {
atomic.AddInt32(&p.idleWorkerCount, -1)
}

// Decrement waiting task count
atomic.AddUint64(&p.waitingTaskCount, ^uint64(0))

// Execute task
task()

// Increment successful task count
atomic.AddUint64(&p.successfulTaskCount, 1)

// Increment idle count
atomic.AddInt32(&p.idleWorkerCount, 1)
}

func (p *WorkerPool) incrementWorkerCount() (incremented bool) {
func (p *WorkerPool) incrementWorkerCount() bool {

p.mutex.Lock()
defer p.mutex.Unlock()

runningWorkerCount := p.RunningWorkers()

// Reached max workers, do not create a new one
if runningWorkerCount >= p.maxWorkers {
return
return false
}

// Idle workers available, do not create a new one
if runningWorkerCount >= p.minWorkers && runningWorkerCount > 0 && p.IdleWorkers() > 0 {
return
return false
}

// Execute the resizing strategy to determine if we should create more workers
if resize := p.strategy.Resize(runningWorkerCount, p.minWorkers, p.maxWorkers); !resize {
return false
}

// Increment worker count
atomic.AddInt32(&p.workerCount, 1)

// Increment wait group
p.workersWaitGroup.Add(1)

return true
}

func (p *WorkerPool) decrementWorkerCount() bool {

p.mutex.Lock()
defer p.mutex.Unlock()

// Execute the resizing strategy to determine if we can create more workers
incremented = p.strategy.Resize(runningWorkerCount, p.minWorkers, p.maxWorkers)
if p.IdleWorkers() <= 0 || p.RunningWorkers() <= p.minWorkers || p.Stopped() {
return false
}

if incremented {
// Increment worker count
atomic.AddInt32(&p.workerCount, 1)
// Decrement worker count
atomic.AddInt32(&p.workerCount, -1)

// Increment wait group
p.workersWaitGroup.Add(1)
}
// Decrement idle count
atomic.AddInt32(&p.idleWorkerCount, -1)

return
return true
}

func (p *WorkerPool) decrementWorkerCount() {
func (p *WorkerPool) resetWorkerCount() {

p.mutex.Lock()
defer p.mutex.Unlock()

// Decrement worker count
atomic.AddInt32(&p.workerCount, -1)
// Reset worker count
atomic.StoreInt32(&p.workerCount, 0)

// Decrement wait group
p.workersWaitGroup.Done()
// Reset idle count
atomic.StoreInt32(&p.idleWorkerCount, 0)
}

// Group creates a new task group
Expand Down Expand Up @@ -506,45 +546,3 @@ func (p *WorkerPool) GroupContext(ctx context.Context) (*TaskGroupWithContext, c
cancel: cancel,
}, ctx
}

// worker launches a worker goroutine
func worker(context context.Context, firstTask func(), tasks <-chan func(), idleWorkerCount *int32, exitHandler func(), taskExecutor func(func())) {

defer func() {
// Decrement idle count
atomic.AddInt32(idleWorkerCount, -1)

// Handle normal exit
exitHandler()
}()

// We have received a task, execute it
if firstTask != nil {
taskExecutor(firstTask)

// Increment idle count
atomic.AddInt32(idleWorkerCount, 1)
}

for {
select {
case <-context.Done():
// Pool context was cancelled, exit
return
case task, ok := <-tasks:
if task == nil || !ok {
// We have received a signal to quit
return
}

// Decrement idle count
atomic.AddInt32(idleWorkerCount, -1)

// We have received a task, execute it
taskExecutor(task)

// Increment idle count
atomic.AddInt32(idleWorkerCount, 1)
}
}
}
29 changes: 28 additions & 1 deletion pond_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,32 @@ func TestPurgeAfterPoolStopped(t *testing.T) {

// Simulate purger goroutine attempting to stop a worker after tasks channel is closed
atomic.StoreInt32(&pool.stopped, 1)
pool.stopIdleWorker()
pool.maybeStopIdleWorker()
}

// See: https://github.com/alitto/pond/issues/33
func TestPurgeDuringSubmit(t *testing.T) {

pool := New(1, 1)

var doneCount int32

// Submit a task to ensure at least 1 worker is started
pool.SubmitAndWait(func() {
atomic.AddInt32(&doneCount, 1)
})

assertEqual(t, 1, pool.IdleWorkers())

// Stop an idle worker right before submitting another task
pool.maybeStopIdleWorker()
pool.Submit(func() {
atomic.AddInt32(&doneCount, 1)
})

pool.StopAndWait()

assertEqual(t, int32(2), atomic.LoadInt32(&doneCount))
assertEqual(t, 0, pool.RunningWorkers())

}
35 changes: 35 additions & 0 deletions worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package pond

import (
"context"
"sync"
)

// worker represents a worker goroutine
func worker(context context.Context, waitGroup *sync.WaitGroup, firstTask func(), tasks <-chan func(), taskExecutor func(func(), bool)) {

// If provided, execute the first task immediately, before listening to the tasks channel
if firstTask != nil {
taskExecutor(firstTask, true)
}

defer func() {
waitGroup.Done()
}()

for {
select {
case <-context.Done():
// Pool context was cancelled, exit
return
case task, ok := <-tasks:
if task == nil || !ok {
// We have received a signal to exit
return
}

// We have received a task, execute it
taskExecutor(task, false)
}
}
}

0 comments on commit c7a74b9

Please sign in to comment.