Skip to content

Commit

Permalink
fix: use sync.Cond to handle no-task blocking wait (#299)
Browse files Browse the repository at this point in the history
Ref: #284
  • Loading branch information
rvagg authored Dec 4, 2021
1 parent a511707 commit 83aebf1
Showing 1 changed file with 15 additions and 15 deletions.
30 changes: 15 additions & 15 deletions taskqueue/taskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package taskqueue

import (
"context"
"sync/atomic"
"sync"
"time"

"github.com/ipfs/go-peertaskqueue"
Expand Down Expand Up @@ -33,7 +33,7 @@ type WorkerTaskQueue struct {
cancelFn func()
peerTaskQueue *peertaskqueue.PeerTaskQueue
workSignal chan struct{}
noTaskSignal chan struct{}
noTaskCond *sync.Cond
ticker *time.Ticker
activeTasks int32
}
Expand All @@ -46,7 +46,7 @@ func NewTaskQueue(ctx context.Context) *WorkerTaskQueue {
cancelFn: cancelFn,
peerTaskQueue: peertaskqueue.New(),
workSignal: make(chan struct{}, 1),
noTaskSignal: make(chan struct{}, 1),
noTaskCond: sync.NewCond(&sync.Mutex{}),
ticker: time.NewTicker(thawSpeed),
}
}
Expand Down Expand Up @@ -93,13 +93,11 @@ func (tq *WorkerTaskQueue) Shutdown() {
}

func (tq *WorkerTaskQueue) WaitForNoActiveTasks() {
for atomic.LoadInt32(&tq.activeTasks) > 0 {
select {
case <-tq.ctx.Done():
return
case <-tq.noTaskSignal:
}
tq.noTaskCond.L.Lock()
for tq.activeTasks > 0 {
tq.noTaskCond.Wait()
}
tq.noTaskCond.L.Unlock()
}

func (tq *WorkerTaskQueue) worker(executor Executor) {
Expand All @@ -118,14 +116,16 @@ func (tq *WorkerTaskQueue) worker(executor Executor) {
}
}
for _, task := range tasks {
atomic.AddInt32(&tq.activeTasks, 1)
tq.noTaskCond.L.Lock()
tq.activeTasks = tq.activeTasks + 1
tq.noTaskCond.L.Unlock()
terminate := executor.ExecuteTask(tq.ctx, pid, task)
if atomic.AddInt32(&tq.activeTasks, -1) == 0 {
select {
case tq.noTaskSignal <- struct{}{}:
default:
}
tq.noTaskCond.L.Lock()
tq.activeTasks = tq.activeTasks - 1
if tq.activeTasks == 0 {
tq.noTaskCond.Broadcast()
}
tq.noTaskCond.L.Unlock()
if terminate {
return
}
Expand Down

0 comments on commit 83aebf1

Please sign in to comment.