Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make WrappedQueues and PersistableChannelUniqueQueues Pausable #18393

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion modules/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func RegisteredTypesAsString() []string {
func NewQueue(queueType Type, handlerFunc HandlerFunc, opts, exemplar interface{}) (Queue, error) {
newFn, ok := queuesMap[queueType]
if !ok {
return nil, fmt.Errorf("Unsupported queue type: %v", queueType)
return nil, fmt.Errorf("unsupported queue type: %v", queueType)
}
return newFn(handlerFunc, opts, exemplar)
}
6 changes: 3 additions & 3 deletions modules/queue/queue_bytefifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (q *ByteFIFOQueue) Push(data Data) error {
// PushBack pushes data to the fifo
func (q *ByteFIFOQueue) PushBack(data Data) error {
if !assignableTo(data, q.exemplar) {
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
return fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
}
bs, err := json.Marshal(data)
if err != nil {
Expand All @@ -110,7 +110,7 @@ func (q *ByteFIFOQueue) PushBack(data Data) error {
// PushFunc pushes data to the fifo
func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error {
if !assignableTo(data, q.exemplar) {
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
return fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
}
bs, err := json.Marshal(data)
if err != nil {
Expand Down Expand Up @@ -398,7 +398,7 @@ func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFun
// Has checks if the provided data is in the queue
func (q *ByteFIFOUniqueQueue) Has(data Data) (bool, error) {
if !assignableTo(data, q.exemplar) {
return false, fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
return false, fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
}
bs, err := json.Marshal(data)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion modules/queue/queue_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (q *ChannelQueue) Run(atShutdown, atTerminate func(func())) {
// Push will push data into the queue
func (q *ChannelQueue) Push(data Data) error {
if !assignableTo(data, q.exemplar) {
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name)
return fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name)
}
q.WorkerPool.Push(data)
return nil
Expand Down
46 changes: 43 additions & 3 deletions modules/queue/queue_wrapped.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (q *delayedStarter) setInternal(atShutdown func(func()), handle HandlerFunc
if s, ok := cfg.([]byte); ok {
cfg = string(s)
}
return fmt.Errorf("Timedout creating queue %v with cfg %#v in %s", q.underlying, cfg, q.name)
return fmt.Errorf("timedout creating queue %v with cfg %#v in %s", q.underlying, cfg, q.name)
default:
queue, err := NewQueue(q.underlying, handle, q.cfg, exemplar)
if err == nil {
Expand All @@ -76,9 +76,9 @@ func (q *delayedStarter) setInternal(atShutdown func(func()), handle HandlerFunc
i++
if q.maxAttempts > 0 && i > q.maxAttempts {
if bs, ok := q.cfg.([]byte); ok {
return fmt.Errorf("Unable to create queue %v for %s with cfg %s by max attempts: error: %v", q.underlying, q.name, string(bs), err)
return fmt.Errorf("unable to create queue %v for %s with cfg %s by max attempts: error: %v", q.underlying, q.name, string(bs), err)
}
return fmt.Errorf("Unable to create queue %v for %s with cfg %#v by max attempts: error: %v", q.underlying, q.name, q.cfg, err)
return fmt.Errorf("unable to create queue %v for %s with cfg %#v by max attempts: error: %v", q.underlying, q.name, q.cfg, err)
}
sleepTime := 100 * time.Millisecond
if q.timeout > 0 && q.maxAttempts > 0 {
Expand Down Expand Up @@ -271,6 +271,46 @@ func (q *WrappedQueue) Terminate() {
log.Debug("WrappedQueue: %s Terminated", q.name)
}

// IsPaused will return if the pool or queue is paused
func (q *WrappedQueue) IsPaused() bool {
q.lock.Lock()
defer q.lock.Unlock()
pausable, ok := q.internal.(Pausable)
return ok && pausable.IsPaused()
lafriks marked this conversation as resolved.
Show resolved Hide resolved
}

// Pause will pause the pool or queue
func (q *WrappedQueue) Pause() {
q.lock.Lock()
defer q.lock.Unlock()
if pausable, ok := q.internal.(Pausable); ok {
pausable.Pause()
}
}

// Resume will resume the pool or queue
func (q *WrappedQueue) Resume() {
q.lock.Lock()
defer q.lock.Unlock()
if pausable, ok := q.internal.(Pausable); ok {
pausable.Resume()
}
}

// IsPausedIsResumed will return a bool indicating if the pool or queue is paused and a channel that will be closed when it is resumed
func (q *WrappedQueue) IsPausedIsResumed() (paused, resumed <-chan struct{}) {
q.lock.Lock()
defer q.lock.Unlock()
if pausable, ok := q.internal.(Pausable); ok {
return pausable.IsPausedIsResumed()
}
return context.Background().Done(), closedChan
}

var closedChan chan struct{}

func init() {
queuesMap[WrappedQueueType] = NewWrappedQueue
closedChan = make(chan struct{})
close(closedChan)
}
2 changes: 1 addition & 1 deletion modules/queue/setting.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func validType(t string) (Type, error) {
return typ, nil
}
}
return PersistableChannelQueueType, fmt.Errorf("Unknown queue type: %s defaulting to %s", t, string(PersistableChannelQueueType))
return PersistableChannelQueueType, fmt.Errorf("unknown queue type: %s defaulting to %s", t, string(PersistableChannelQueueType))
}

func getQueueSettings(name string) (setting.QueueSettings, []byte) {
Expand Down
2 changes: 1 addition & 1 deletion modules/queue/unique_queue_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (q *ChannelUniqueQueue) Push(data Data) error {
// PushFunc will push data into the queue
func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error {
if !assignableTo(data, q.exemplar) {
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name)
return fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name)
}

bs, err := json.Marshal(data)
Expand Down
20 changes: 20 additions & 0 deletions modules/queue/unique_queue_disk_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,26 @@ func (q *PersistableChannelUniqueQueue) IsEmpty() bool {
return q.channelQueue.IsEmpty()
}

// IsPaused will return if the pool or queue is paused
func (q *PersistableChannelUniqueQueue) IsPaused() bool {
return q.channelQueue.IsPaused()
}

// Pause will pause the pool or queue
func (q *PersistableChannelUniqueQueue) Pause() {
q.channelQueue.Pause()
}

// Resume will resume the pool or queue
func (q *PersistableChannelUniqueQueue) Resume() {
q.channelQueue.Resume()
}

// IsPausedIsResumed will return a bool indicating if the pool or queue is paused and a channel that will be closed when it is resumed
func (q *PersistableChannelUniqueQueue) IsPausedIsResumed() (paused, resumed <-chan struct{}) {
return q.channelQueue.IsPausedIsResumed()
}

// Shutdown processing this queue
func (q *PersistableChannelUniqueQueue) Shutdown() {
log.Trace("PersistableChannelUniqueQueue: %s Shutting down", q.delayedStarter.name)
Expand Down
2 changes: 1 addition & 1 deletion modules/queue/unique_queue_wrapped.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (q *WrappedUniqueQueue) Push(data Data) error {
// PushFunc will push the data to the internal channel checking it against the exemplar
func (q *WrappedUniqueQueue) PushFunc(data Data, fn func() error) error {
if !assignableTo(data, q.exemplar) {
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
return fmt.Errorf("unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
}

q.tlock.Lock()
Expand Down
4 changes: 1 addition & 3 deletions modules/queue/workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,12 @@ func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPo
ctx, cancel := context.WithCancel(context.Background())

dataChan := make(chan Data, config.QueueLength)
resumed := make(chan struct{})
close(resumed)
pool := &WorkerPool{
baseCtx: ctx,
baseCtxCancel: cancel,
batchLength: config.BatchLength,
dataChan: dataChan,
resumed: resumed,
resumed: closedChan,
paused: make(chan struct{}),
handle: handle,
blockTimeout: config.BlockTimeout,
Expand Down